VOL-2292: Create application for scale testing of BAL

- Base framework created and is functional
- Able to provision ATT techprofile with scheduler, queue and eapol
  flow creation.
- Extensible framework provided to add various operator workflows
- README has details about how to build, run, configure and extend
  the framework.

Change-Id: I71774959281881278c14b48bee7f9adc0b81ec68
diff --git a/core/att_workflow.go b/core/att_workflow.go
new file mode 100644
index 0000000..f20d866
--- /dev/null
+++ b/core/att_workflow.go
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package core
+
+import (
+	"errors"
+	"github.com/opencord/openolt-scale-tester/config"
+	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	"github.com/opencord/voltha-lib-go/v2/pkg/ponresourcemanager"
+	oop "github.com/opencord/voltha-protos/v2/go/openolt"
+	tp_pb "github.com/opencord/voltha-protos/v2/go/tech_profile"
+	"golang.org/x/net/context"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+func init() {
+	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
+}
+
+// A dummy struct to comply with the WorkFlow interface.
+type AttWorkFlow struct {
+}
+
+func ProvisionAttNniTrapFlow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig, rsrMgr *OpenOltResourceMgr) error {
+	var flowID []uint32
+	var err error
+
+	if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(uint32(config.NniIntfID),
+		ponresourcemanager.FLOW_ID, 1); err != nil {
+		return err
+	}
+
+	flowClassifier := &oop.Classifier{EthType: 2048, IpProto: 17, SrcPort: 67, DstPort: 68, PktTagType: "double_tag"}
+	actionCmd := &oop.ActionCmd{TrapToHost: true}
+	actionInfo := &oop.Action{Cmd: actionCmd}
+
+	flow := oop.Flow{AccessIntfId: -1, OnuId: -1, UniId: -1, FlowId: flowID[0],
+		FlowType: "downstream", AllocId: -1, GemportId: -1,
+		Classifier: flowClassifier, Action: actionInfo,
+		Priority: 1000, PortNo: 65536}
+
+	_, err = oo.FlowAdd(context.Background(), &flow)
+
+	st, _ := status.FromError(err)
+	if st.Code() == codes.AlreadyExists {
+		log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+		return nil
+	}
+
+	if err != nil {
+		log.Errorw("Failed to Add flow to device", log.Fields{"err": err, "deviceFlow": flow})
+		rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(uint32(config.NniIntfID), ponresourcemanager.FLOW_ID, flowID)
+		return err
+	}
+	log.Debugw("Flow added to device successfully ", log.Fields{"flow": flow})
+
+	return nil
+}
+
+func getTrafficSched(subs *Subscriber, direction tp_pb.Direction) []*tp_pb.TrafficScheduler {
+	var SchedCfg *tp_pb.SchedulerConfig
+
+	if direction == tp_pb.Direction_DOWNSTREAM {
+		SchedCfg = subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
+			GetDsScheduler(subs.TpInstance[subs.TestConfig.TpIDList[0]])
+
+	} else {
+		SchedCfg = subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
+			GetUsScheduler(subs.TpInstance[subs.TestConfig.TpIDList[0]])
+	}
+
+	// hard-code for now
+	cir := 16000
+	cbs := 5000
+	eir := 16000
+	ebs := 5000
+	pir := cir + eir
+	pbs := cbs + ebs
+
+	TrafficShaping := &tp_pb.TrafficShapingInfo{Cir: uint32(cir), Cbs: uint32(cbs), Pir: uint32(pir), Pbs: uint32(pbs)}
+
+	TrafficSched := []*tp_pb.TrafficScheduler{subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
+		GetTrafficScheduler(subs.TpInstance[subs.TestConfig.TpIDList[0]], SchedCfg, TrafficShaping)}
+
+	return TrafficSched
+}
+
+func (att AttWorkFlow) ProvisionScheds(subs *Subscriber) error {
+	var trafficSched []*tp_pb.TrafficScheduler
+
+	log.Info("provisioning-scheds")
+
+	if trafficSched = getTrafficSched(subs, tp_pb.Direction_DOWNSTREAM); trafficSched == nil {
+		log.Error("ds-traffic-sched-is-nil")
+		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
+	}
+
+	log.Debugw("Sending Traffic scheduler create to device",
+		log.Fields{"Direction": tp_pb.Direction_DOWNSTREAM, "TrafficScheds": trafficSched})
+	if _, err := subs.OpenOltClient.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
+		IntfId: subs.PonIntf, OnuId: subs.OnuID,
+		UniId: subs.UniID, PortNo: subs.UniPortNo,
+		TrafficScheds: trafficSched}); err != nil {
+		log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
+		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
+	}
+
+	if trafficSched = getTrafficSched(subs, tp_pb.Direction_UPSTREAM); trafficSched == nil {
+		log.Error("us-traffic-sched-is-nil")
+		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
+	}
+
+	log.Debugw("Sending Traffic scheduler create to device",
+		log.Fields{"Direction": tp_pb.Direction_UPSTREAM, "TrafficScheds": trafficSched})
+	if _, err := subs.OpenOltClient.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
+		IntfId: subs.PonIntf, OnuId: subs.OnuID,
+		UniId: subs.UniID, PortNo: subs.UniPortNo,
+		TrafficScheds: trafficSched}); err != nil {
+		log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
+		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
+	}
+
+	return nil
+}
+
+func getTrafficQueues(subs *Subscriber, direction tp_pb.Direction) []*tp_pb.TrafficQueue {
+
+	trafficQueues := subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
+		GetTrafficQueues(subs.TpInstance[subs.TestConfig.TpIDList[0]], direction)
+
+	return trafficQueues
+}
+
+func (att AttWorkFlow) ProvisionQueues(subs *Subscriber) error {
+	log.Info("provisioning-queues")
+
+	var trafficQueues []*tp_pb.TrafficQueue
+	if trafficQueues = getTrafficQueues(subs, tp_pb.Direction_DOWNSTREAM); trafficQueues == nil {
+		log.Error("Failed to create traffic queues")
+		return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
+	}
+
+	// On receiving the CreateTrafficQueues request, the driver should create corresponding
+	// downstream queues.
+	log.Debugw("Sending Traffic Queues create to device",
+		log.Fields{"Direction": tp_pb.Direction_DOWNSTREAM, "TrafficQueues": trafficQueues})
+	if _, err := subs.OpenOltClient.CreateTrafficQueues(context.Background(),
+		&tp_pb.TrafficQueues{IntfId: subs.PonIntf, OnuId: subs.OnuID,
+			UniId: subs.UniID, PortNo: subs.UniPortNo,
+			TrafficQueues: trafficQueues}); err != nil {
+		log.Errorw("Failed to create traffic queues in device", log.Fields{"error": err})
+		return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
+	}
+
+	if trafficQueues = getTrafficQueues(subs, tp_pb.Direction_UPSTREAM); trafficQueues == nil {
+		log.Error("Failed to create traffic queues")
+		return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
+	}
+
+	// On receiving the CreateTrafficQueues request, the driver should create corresponding
+	// upstream queues.
+	log.Debugw("Sending Traffic Queues create to device",
+		log.Fields{"Direction": tp_pb.Direction_UPSTREAM, "TrafficQueues": trafficQueues})
+	if _, err := subs.OpenOltClient.CreateTrafficQueues(context.Background(),
+		&tp_pb.TrafficQueues{IntfId: subs.PonIntf, OnuId: subs.OnuID,
+			UniId: subs.UniID, PortNo: subs.UniPortNo,
+			TrafficQueues: trafficQueues}); err != nil {
+		log.Errorw("Failed to create traffic queues in device", log.Fields{"error": err})
+		return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
+	}
+
+	return nil
+}
+
+func (att AttWorkFlow) ProvisionEapFlow(subs *Subscriber) error {
+	log.Info("provisioning-eap--att")
+
+	var err error
+	var flowID []uint32
+	var gemPortIDs []uint32
+
+	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
+
+	for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
+		gemPortIDs = append(gemPortIDs, gem.GemportID)
+	}
+
+	for _, gemID := range gemPortIDs {
+		if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
+			ponresourcemanager.FLOW_ID, 1); err != nil {
+			return errors.New(ReasonCodeToReasonString(FLOW_ADD_FAILED))
+		}
+		flowClassifier := &oop.Classifier{EthType: 34958, OVid: subs.Ctag, PktTagType: "single_tag"}
+		actionCmd := &oop.ActionCmd{TrapToHost: true}
+		actionInfo := &oop.Action{Cmd: actionCmd}
+
+		flow := oop.Flow{AccessIntfId: int32(subs.PonIntf), OnuId: int32(subs.OnuID),
+			UniId: int32(subs.UniID), FlowId: flowID[0],
+			FlowType: "upstream", AllocId: int32(allocID), GemportId: int32(gemID),
+			Classifier: flowClassifier, Action: actionInfo,
+			Priority: 1000, PortNo: subs.UniPortNo}
+
+		_, err = subs.OpenOltClient.FlowAdd(context.Background(), &flow)
+
+		st, _ := status.FromError(err)
+		if st.Code() == codes.AlreadyExists {
+			log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+			continue
+		}
+
+		if err != nil {
+			log.Errorw("Failed to Add flow to device", log.Fields{"err": err, "deviceFlow": flow})
+			subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
+				ponresourcemanager.FLOW_ID, flowID)
+			return errors.New(ReasonCodeToReasonString(FLOW_ADD_FAILED))
+		}
+		log.Debugw("Flow added to device successfully ", log.Fields{"flow": flow})
+	}
+	return nil
+}
+
+func (att AttWorkFlow) ProvisionDhcpFlow(subs *Subscriber) error {
+	// TODO
+	log.Info("provisioning-dhcp")
+	return nil
+}
+
+func (att AttWorkFlow) ProvisionIgmpFlow(subs *Subscriber) error {
+	log.Info("att-workflow-does-not-support-igmp-yet--nothing-to-do")
+	return nil
+}
+
+func (att AttWorkFlow) ProvisionHsiaFlow(subs *Subscriber) error {
+	// TODO
+	log.Info("provisioning-hsia")
+	return nil
+}
diff --git a/core/olt_manager.go b/core/olt_manager.go
new file mode 100644
index 0000000..122c18d
--- /dev/null
+++ b/core/olt_manager.go
@@ -0,0 +1,441 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package core
+
+import (
+	"context"
+	"encoding/hex"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"github.com/cenkalti/backoff/v3"
+	"github.com/opencord/openolt-scale-tester/config"
+	"github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
+	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	"github.com/opencord/voltha-lib-go/v2/pkg/techprofile"
+	oop "github.com/opencord/voltha-protos/v2/go/openolt"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"io"
+	"io/ioutil"
+	"os"
+	"strconv"
+	"sync"
+	"syscall"
+	"time"
+)
+
+const (
+	ReasonOk          = "OK"
+	TechProfileKVPath = "service/voltha/technology_profiles/%s/%d" // service/voltha/technology_profiles/xgspon/<tech_profile_tableID>
+)
+
+type OnuDeviceKey struct {
+	onuID    uint32
+	ponInfID uint32
+}
+
+type OpenOltManager struct {
+	ipPort        string
+	deviceInfo    *oop.DeviceInfo
+	OnuDeviceMap  map[OnuDeviceKey]*OnuDevice `json:"onuDeviceMap"`
+	TechProfile   map[uint32]*techprofile.TechProfileIf
+	clientConn    *grpc.ClientConn
+	openOltClient oop.OpenoltClient
+	testConfig    *config.OpenOltScaleTesterConfig
+	rsrMgr        *OpenOltResourceMgr
+	lockRsrAlloc  sync.RWMutex
+}
+
+func init() {
+	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
+}
+
+func NewOpenOltManager(ipPort string) *OpenOltManager {
+	log.Infow("initialized openolt manager with ipPort", log.Fields{"ipPort": ipPort})
+	return &OpenOltManager{
+		ipPort:       ipPort,
+		OnuDeviceMap: make(map[OnuDeviceKey]*OnuDevice),
+		lockRsrAlloc: sync.RWMutex{},
+	}
+}
+
+func (om *OpenOltManager) readAndLoadTPsToEtcd() {
+	var byteValue []byte
+	var err error
+	// Verify that etcd is up before starting the application.
+	etcdIpPort := "http://" + om.testConfig.KVStoreHost + ":" + strconv.Itoa(om.testConfig.KVStorePort)
+	client, err := kvstore.NewEtcdClient(etcdIpPort, 5)
+	if err != nil || client == nil {
+		log.Fatal("error-initializing-etcd-client")
+		return
+	}
+
+	// Load TPs to etcd for each of the specified tech-profiles
+	for _, tpID := range om.testConfig.TpIDList {
+		// Below should translate to something like "/app/ATT-64.json"
+		// The TP file should exist.
+		tpFilePath := "/app/" + om.testConfig.WorkflowName + "-" + strconv.Itoa(tpID) + ".json"
+		// Open our jsonFile
+		jsonFile, err := os.Open(tpFilePath)
+		// if we os.Open returns an error then handle it
+		if err != nil {
+			log.Fatalw("could-not-find-tech-profile", log.Fields{"err": err, "tpFile": tpFilePath})
+		}
+		log.Debugw("tp-file-opened-successfully", log.Fields{"tpFile": tpFilePath})
+
+		// read our opened json file as a byte array.
+		if byteValue, err = ioutil.ReadAll(jsonFile); err != nil {
+			log.Fatalw("could-not-read-tp-file", log.Fields{"err": err, "tpFile": tpFilePath})
+		}
+
+		var tp techprofile.TechProfile
+
+		if err = json.Unmarshal(byteValue, &tp); err != nil {
+			log.Fatalw("could-not-unmarshal-tp", log.Fields{"err": err, "tpFile": tpFilePath})
+		} else {
+			log.Infow("tp-read-from-file", log.Fields{"tp": tp, "tpFile": tpFilePath})
+		}
+		kvPath := fmt.Sprintf(TechProfileKVPath, om.deviceInfo.Technology, tpID)
+		tpJson, err := json.Marshal(tp)
+		err = client.Put(kvPath, tpJson, 2)
+		if err != nil {
+			log.Fatalw("tp-put-to-etcd-failed", log.Fields{"tpPath": kvPath, "err": err})
+		}
+		// verify the PUT succeeded.
+		kvResult, err := client.Get(kvPath, 2)
+		if kvResult == nil {
+			log.Fatal("tp-not-found-on-kv-after-load", log.Fields{"key": kvPath, "err": err})
+		} else {
+			var KvTpIns techprofile.TechProfile
+			var resPtr = &KvTpIns
+			if value, err := kvstore.ToByte(kvResult.Value); err == nil {
+				if err = json.Unmarshal(value, resPtr); err != nil {
+					log.Fatal("error-unmarshal-kv-result", log.Fields{"err": err, "key": kvPath, "value": value})
+				} else {
+					log.Infow("verified-ok-that-tp-load-was-good", log.Fields{"tpID": tpID, "kvPath": kvPath})
+					_ = jsonFile.Close()
+					continue
+				}
+			}
+		}
+	}
+}
+
+func (om *OpenOltManager) Start(testConfig *config.OpenOltScaleTesterConfig) error {
+	var err error
+	om.testConfig = testConfig
+
+	// Establish gRPC connection with the device
+	if om.clientConn, err = grpc.Dial(om.ipPort, grpc.WithInsecure(), grpc.WithBlock()); err != nil {
+		log.Errorw("Failed to dial device", log.Fields{"ipPort": om.ipPort, "err": err})
+		return err
+	}
+	om.openOltClient = oop.NewOpenoltClient(om.clientConn)
+
+	// Populate Device Info
+	if deviceInfo, err := om.populateDeviceInfo(); err != nil {
+		log.Error("error fetching device info", log.Fields{"err": err, "deviceInfo": deviceInfo})
+		return err
+	}
+
+	// Read and load TPs to etcd.
+	om.readAndLoadTPsToEtcd()
+
+	log.Info("etcd-up-and-running--tp-loaded-successfully")
+
+	if om.rsrMgr = NewResourceMgr("ABCD", om.testConfig.KVStoreHost+":"+strconv.Itoa(om.testConfig.KVStorePort),
+		"etcd", "openolt", om.deviceInfo); om.rsrMgr == nil {
+		log.Error("Error while instantiating resource manager")
+		return errors.New("instantiating resource manager failed")
+	}
+
+	om.TechProfile = make(map[uint32]*techprofile.TechProfileIf)
+	if err = om.populateTechProfilePerPonPort(); err != nil {
+		log.Error("Error while populating tech profile mgr\n")
+		return errors.New("error-loading-tech-profile-per-ponPort")
+	}
+
+	// Start reading indications
+	go om.readIndications()
+
+	// Provision OLT NNI Trap flows as needed by the Workflow
+	if err = ProvisionNniTrapFlow(om.openOltClient, om.testConfig, om.rsrMgr); err != nil {
+		log.Error("failed-to-add-nni-trap-flow", log.Fields{"err": err})
+	}
+
+	// Provision ONUs one by one
+	go om.provisionONUs()
+
+	return nil
+
+}
+
+func (om *OpenOltManager) populateDeviceInfo() (*oop.DeviceInfo, error) {
+	var err error
+
+	if om.deviceInfo, err = om.openOltClient.GetDeviceInfo(context.Background(), new(oop.Empty)); err != nil {
+		log.Errorw("Failed to fetch device info", log.Fields{"err": err})
+		return nil, err
+	}
+
+	if om.deviceInfo == nil {
+		log.Errorw("Device info is nil", log.Fields{})
+		return nil, errors.New("failed to get device info from OLT")
+	}
+
+	log.Debugw("Fetched device info", log.Fields{"deviceInfo": om.deviceInfo})
+
+	return om.deviceInfo, nil
+}
+
+func (om *OpenOltManager) provisionONUs() {
+	var numOfONUsPerPon uint
+	var i, j, onuID uint32
+	var err error
+	oltChan := make(chan bool)
+	numOfONUsPerPon = om.testConfig.NumOfOnu / uint(om.deviceInfo.PonPorts)
+	if oddONUs := om.testConfig.NumOfOnu % uint(om.deviceInfo.PonPorts); oddONUs > 0 {
+		log.Warnw("Odd number ONUs left out of provisioning", log.Fields{"oddONUs": oddONUs})
+	}
+	totalOnusToProvision := numOfONUsPerPon * uint(om.deviceInfo.PonPorts)
+	log.Infow("***** all-onu-provision-started ******",
+		log.Fields{"totalNumOnus": totalOnusToProvision,
+			"numOfOnusPerPon": numOfONUsPerPon,
+			"numOfPons":       om.deviceInfo.PonPorts})
+	for i = 0; i < om.deviceInfo.PonPorts; i++ {
+		for j = 0; j < uint32(numOfONUsPerPon); j++ {
+			// TODO: More work with ONU provisioning
+			om.lockRsrAlloc.Lock()
+			sn := GenerateNextONUSerialNumber()
+			om.lockRsrAlloc.Unlock()
+			log.Debugw("provisioning onu", log.Fields{"onuID": j, "ponPort": i, "serialNum": sn})
+			if onuID, err = om.rsrMgr.GetONUID(i); err != nil {
+				log.Errorw("error getting onu id", log.Fields{"err": err})
+				continue
+			}
+			log.Infow("onu-provision-started-from-olt-manager", log.Fields{"onuId": onuID, "ponIntf": i})
+			go om.activateONU(i, onuID, sn, om.stringifySerialNumber(sn), oltChan)
+			// Wait for complete ONU provision to succeed, including provisioning the subscriber
+			<-oltChan
+			log.Infow("onu-provision-completed-from-olt-manager", log.Fields{"onuId": onuID, "ponIntf": i})
+
+			// Sleep for configured time before provisioning next ONU
+			time.Sleep(time.Duration(om.testConfig.TimeIntervalBetweenSubs))
+		}
+	}
+	log.Info("******** all-onu-provisioning-completed *******")
+
+	// TODO: We need to dump the results at the end. But below json marshall does not work
+	// We will need custom Marshal function.
+	/*
+		e, err := json.Marshal(om)
+		if err != nil {
+			fmt.Println(err)
+			return
+		}
+		fmt.Println(string(e))
+	*/
+
+	// Stop the process once the job is done
+	_ = syscall.Kill(syscall.Getpid(), syscall.SIGINT)
+}
+
+func (om *OpenOltManager) activateONU(intfID uint32, onuID uint32, serialNum *oop.SerialNumber, serialNumber string, oltCh chan bool) {
+	log.Debugw("activate-onu", log.Fields{"intfID": intfID, "onuID": onuID, "serialNum": serialNum, "serialNumber": serialNumber})
+	// TODO: need resource manager
+	var pir uint32 = 1000000
+	var onuDevice = OnuDevice{
+		SerialNum:     serialNumber,
+		OnuID:         onuID,
+		PonIntf:       intfID,
+		openOltClient: om.openOltClient,
+		testConfig:    om.testConfig,
+		rsrMgr:        om.rsrMgr,
+	}
+	var err error
+	onuDeviceKey := OnuDeviceKey{onuID: onuID, ponInfID: intfID}
+	Onu := oop.Onu{IntfId: intfID, OnuId: onuID, SerialNumber: serialNum, Pir: pir}
+	now := time.Now()
+	nanos := now.UnixNano()
+	milliStart := nanos / 1000000
+	onuDevice.OnuProvisionStartTime = time.Unix(0, nanos)
+	if _, err = om.openOltClient.ActivateOnu(context.Background(), &Onu); err != nil {
+		st, _ := status.FromError(err)
+		if st.Code() == codes.AlreadyExists {
+			log.Debug("ONU activation is in progress", log.Fields{"SerialNumber": serialNumber})
+			oltCh <- false
+		} else {
+			nanos = now.UnixNano()
+			milliEnd := nanos / 1000000
+			onuDevice.OnuProvisionEndTime = time.Unix(0, nanos)
+			onuDevice.OnuProvisionDurationInMs = milliEnd - milliStart
+			log.Errorw("activate-onu-failed", log.Fields{"Onu": Onu, "err ": err})
+			onuDevice.Reason = err.Error()
+			oltCh <- false
+		}
+	} else {
+		nanos = now.UnixNano()
+		milliEnd := nanos / 1000000
+		onuDevice.OnuProvisionEndTime = time.Unix(0, nanos)
+		onuDevice.OnuProvisionDurationInMs = milliEnd - milliStart
+		onuDevice.Reason = ReasonOk
+		log.Infow("activated-onu", log.Fields{"SerialNumber": serialNumber})
+	}
+
+	om.OnuDeviceMap[onuDeviceKey] = &onuDevice
+
+	// If ONU activation was success provision the ONU
+	if err == nil {
+		// start provisioning the ONU
+		go om.OnuDeviceMap[onuDeviceKey].Start(oltCh)
+	}
+}
+
+func (om *OpenOltManager) stringifySerialNumber(serialNum *oop.SerialNumber) string {
+	if serialNum != nil {
+		return string(serialNum.VendorId) + om.stringifyVendorSpecific(serialNum.VendorSpecific)
+	}
+	return ""
+}
+
+func (om *OpenOltManager) stringifyVendorSpecific(vendorSpecific []byte) string {
+	tmp := fmt.Sprintf("%x", (uint32(vendorSpecific[0])>>4)&0x0f) +
+		fmt.Sprintf("%x", uint32(vendorSpecific[0]&0x0f)) +
+		fmt.Sprintf("%x", (uint32(vendorSpecific[1])>>4)&0x0f) +
+		fmt.Sprintf("%x", (uint32(vendorSpecific[1]))&0x0f) +
+		fmt.Sprintf("%x", (uint32(vendorSpecific[2])>>4)&0x0f) +
+		fmt.Sprintf("%x", (uint32(vendorSpecific[2]))&0x0f) +
+		fmt.Sprintf("%x", (uint32(vendorSpecific[3])>>4)&0x0f) +
+		fmt.Sprintf("%x", (uint32(vendorSpecific[3]))&0x0f)
+	return tmp
+}
+
+// readIndications to read the indications from the OLT device
+func (om *OpenOltManager) readIndications() {
+	defer log.Errorw("Indications ended", log.Fields{})
+	indications, err := om.openOltClient.EnableIndication(context.Background(), new(oop.Empty))
+	if err != nil {
+		log.Errorw("Failed to read indications", log.Fields{"err": err})
+		return
+	}
+	if indications == nil {
+		log.Errorw("Indications is nil", log.Fields{})
+		return
+	}
+
+	// Create an exponential backoff around re-enabling indications. The
+	// maximum elapsed time for the back off is set to 0 so that we will
+	// continue to retry. The max interval defaults to 1m, but is set
+	// here for code clarity
+	indicationBackoff := backoff.NewExponentialBackOff()
+	indicationBackoff.MaxElapsedTime = 0
+	indicationBackoff.MaxInterval = 1 * time.Minute
+	for {
+		indication, err := indications.Recv()
+		if err == io.EOF {
+			log.Infow("EOF for  indications", log.Fields{"err": err})
+			// Use an exponential back off to prevent getting into a tight loop
+			duration := indicationBackoff.NextBackOff()
+			if duration == backoff.Stop {
+				// If we reach a maximum then warn and reset the backoff
+				// timer and keep attempting.
+				log.Warnw("Maximum indication backoff reached, resetting backoff timer",
+					log.Fields{"max_indication_backoff": indicationBackoff.MaxElapsedTime})
+				indicationBackoff.Reset()
+			}
+			time.Sleep(indicationBackoff.NextBackOff())
+			indications, err = om.openOltClient.EnableIndication(context.Background(), new(oop.Empty))
+			if err != nil {
+				log.Errorw("Failed to read indications", log.Fields{"err": err})
+				return
+			}
+			continue
+		}
+		if err != nil {
+			log.Infow("Failed to read from indications", log.Fields{"err": err})
+			break
+		}
+		// Reset backoff if we have a successful receive
+		indicationBackoff.Reset()
+		om.handleIndication(indication)
+
+	}
+}
+
+func (om *OpenOltManager) handleIndication(indication *oop.Indication) {
+	switch indication.Data.(type) {
+	case *oop.Indication_OltInd:
+		log.Info("received olt indication")
+	case *oop.Indication_IntfInd:
+		intfInd := indication.GetIntfInd()
+		log.Infow("Received interface indication ", log.Fields{"InterfaceInd": intfInd})
+	case *oop.Indication_IntfOperInd:
+		intfOperInd := indication.GetIntfOperInd()
+		if intfOperInd.GetType() == "nni" {
+			log.Info("received interface oper indication for nni port")
+		} else if intfOperInd.GetType() == "pon" {
+			log.Info("received interface oper indication for pon port")
+		}
+		/*
+			case *oop.Indication_OnuDiscInd:
+				onuDiscInd := indication.GetOnuDiscInd()
+				log.Infow("Received Onu discovery indication ", log.Fields{"OnuDiscInd": onuDiscInd})
+		*/
+	case *oop.Indication_OnuInd:
+		onuInd := indication.GetOnuInd()
+		log.Infow("Received Onu indication ", log.Fields{"OnuInd": onuInd})
+	case *oop.Indication_OmciInd:
+		omciInd := indication.GetOmciInd()
+		log.Debugw("Received Omci indication ", log.Fields{"IntfId": omciInd.IntfId, "OnuId": omciInd.OnuId, "pkt": hex.EncodeToString(omciInd.Pkt)})
+	case *oop.Indication_PktInd:
+		pktInd := indication.GetPktInd()
+		log.Infow("Received pakcet indication ", log.Fields{"PktInd": pktInd})
+		/*
+				case *oop.Indication_PortStats:
+				portStats := indication.GetPortStats()
+				log.Infow("Received port stats", log.Fields{"portStats": portStats})
+			case *oop.Indication_FlowStats:
+				flowStats := indication.GetFlowStats()
+				log.Infow("Received flow stats", log.Fields{"FlowStats": flowStats})
+		*/
+	case *oop.Indication_AlarmInd:
+		alarmInd := indication.GetAlarmInd()
+		log.Infow("Received alarm indication ", log.Fields{"AlarmInd": alarmInd})
+	}
+}
+
+func (om *OpenOltManager) populateTechProfilePerPonPort() error {
+	var tpCount int
+	for _, techRange := range om.deviceInfo.Ranges {
+		for _, intfID := range techRange.IntfIds {
+			om.TechProfile[intfID] = &(om.rsrMgr.ResourceMgrs[intfID].TechProfileMgr)
+			tpCount++
+			log.Debugw("Init tech profile done", log.Fields{"intfID": intfID})
+		}
+	}
+	//Make sure we have as many tech_profiles as there are pon ports on the device
+	if tpCount != int(om.deviceInfo.GetPonPorts()) {
+		log.Errorw("Error while populating techprofile",
+			log.Fields{"numofTech": tpCount, "numPonPorts": om.deviceInfo.GetPonPorts()})
+		return errors.New("error while populating techprofile mgrs")
+	}
+	log.Infow("Populated techprofile for ponports successfully",
+		log.Fields{"numofTech": tpCount, "numPonPorts": om.deviceInfo.GetPonPorts()})
+	return nil
+}
diff --git a/core/onu_manager.go b/core/onu_manager.go
new file mode 100644
index 0000000..ad8aa8b
--- /dev/null
+++ b/core/onu_manager.go
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package core
+
+import (
+	"github.com/opencord/openolt-scale-tester/config"
+	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	oop "github.com/opencord/voltha-protos/v2/go/openolt"
+	"strconv"
+	"time"
+)
+
+func init() {
+	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
+}
+
+type SubscriberKey struct {
+	SubscriberName string
+}
+
+type OnuDevice struct {
+	SerialNum                string                        `json:"onuSerialNum"`
+	OnuID                    uint32                        `json:"onuID"`
+	PonIntf                  uint32                        `json:"ponIntf"`
+	OnuProvisionStartTime    time.Time                     `json:"onuProvisionStartTime"`
+	OnuProvisionEndTime      time.Time                     `json:"onuProvisionEndTime"`
+	OnuProvisionDurationInMs int64                         `json:"onuProvisionDurationInMilliSec"`
+	Reason                   string                        `json:"reason"` // If provisioning failed, this specifies the reason.
+	SubscriberMap            map[SubscriberKey]*Subscriber `json:"subscriberMap"`
+	openOltClient            oop.OpenoltClient
+	testConfig               *config.OpenOltScaleTesterConfig
+	rsrMgr                   *OpenOltResourceMgr
+}
+
+func (onu *OnuDevice) Start(oltCh chan bool) {
+	onu.SubscriberMap = make(map[SubscriberKey]*Subscriber)
+	onuCh := make(chan bool)
+	var subs uint
+
+	log.Infow("onu-provision-started-from-onu-manager", log.Fields{"onuID": onu.OnuID, "ponIntf": onu.PonIntf})
+
+	for subs = 0; subs < onu.testConfig.SubscribersPerOnu; subs++ {
+		subsName := onu.SerialNum + "-" + strconv.Itoa(int(subs))
+		subs := Subscriber{
+			SubscriberName: subsName,
+			OnuID:          onu.OnuID,
+			UniID:          uint32(subs),
+			PonIntf:        onu.PonIntf,
+			UniPortNo:      MkUniPortNum(onu.PonIntf, onu.OnuID, uint32(subs)),
+			Ctag:           GetCtag(onu.testConfig.WorkflowName, onu.PonIntf),
+			Stag:           GetStag(onu.testConfig.WorkflowName, onu.PonIntf),
+			OpenOltClient:  onu.openOltClient,
+			TestConfig:     onu.testConfig,
+			RsrMgr:         onu.rsrMgr,
+		}
+		subsKey := SubscriberKey{subsName}
+		onu.SubscriberMap[subsKey] = &subs
+
+		log.Infow("subscriber-provision-started-from-onu-manager", log.Fields{"subsName": subsName})
+		// Start provisioning the subscriber
+		go subs.Start(onuCh)
+
+		// Wait for subscriber provision to complete
+		<-onuCh
+
+		log.Infow("subscriber-provision-completed-from-onu-manager", log.Fields{"subsName": subsName})
+
+		//Sleep for configured interval before provisioning another subscriber
+		time.Sleep(time.Duration(onu.testConfig.TimeIntervalBetweenSubs))
+	}
+	// Indicate that the ONU provisioning is complete
+	oltCh <- true
+
+	log.Infow("onu-provision-completed-from-onu-manager", log.Fields{"onuID": onu.OnuID, "ponIntf": onu.PonIntf})
+}
diff --git a/core/resource_manager.go b/core/resource_manager.go
new file mode 100644
index 0000000..1bb5aaf
--- /dev/null
+++ b/core/resource_manager.go
@@ -0,0 +1,451 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//Package resourcemanager provides the utility for managing resources
+package core
+
+import (
+	"errors"
+	"fmt"
+	"strconv"
+	"strings"
+
+	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	ponrmgr "github.com/opencord/voltha-lib-go/v2/pkg/ponresourcemanager"
+	"github.com/opencord/voltha-protos/v2/go/openolt"
+)
+
+func init() {
+	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
+}
+
+// OpenOltResourceMgr holds resource related information as provided below for each field
+type OpenOltResourceMgr struct {
+	deviceInfo *openolt.DeviceInfo
+	// array of pon resource managers per interface technology
+	ResourceMgrs map[uint32]*ponrmgr.PONResourceManager
+}
+
+// NewResourceMgr init a New resource manager instance which in turn instantiates pon resource manager
+// instances according to technology. Initializes the default resource ranges for all
+// the resources.
+func NewResourceMgr(deviceID string, KVStoreHostPort string, kvStoreType string, deviceType string, devInfo *openolt.DeviceInfo) *OpenOltResourceMgr {
+	var ResourceMgr OpenOltResourceMgr
+	log.Debugf("Init new resource manager")
+
+	ResourceMgr.deviceInfo = devInfo
+
+	Ranges := make(map[string]*openolt.DeviceInfo_DeviceResourceRanges)
+	RsrcMgrsByTech := make(map[string]*ponrmgr.PONResourceManager)
+	ResourceMgr.ResourceMgrs = make(map[uint32]*ponrmgr.PONResourceManager)
+
+	// TODO self.args = registry('main').get_args()
+
+	/*
+	   If a legacy driver returns protobuf without any ranges,s synthesize one from
+	   the legacy global per-device information. This, in theory, is temporary until
+	   the legacy drivers are upgrade to support pool ranges.
+	*/
+	if devInfo.Ranges == nil {
+		var ranges openolt.DeviceInfo_DeviceResourceRanges
+		ranges.Technology = devInfo.GetTechnology()
+
+		NumPONPorts := devInfo.GetPonPorts()
+		var index uint32
+		for index = 0; index < NumPONPorts; index++ {
+			ranges.IntfIds = append(ranges.IntfIds, index)
+		}
+
+		var Pool openolt.DeviceInfo_DeviceResourceRanges_Pool
+		Pool.Type = openolt.DeviceInfo_DeviceResourceRanges_Pool_ONU_ID
+		Pool.Start = devInfo.OnuIdStart
+		Pool.End = devInfo.OnuIdEnd
+		Pool.Sharing = openolt.DeviceInfo_DeviceResourceRanges_Pool_DEDICATED_PER_INTF
+		onuPool := Pool
+		ranges.Pools = append(ranges.Pools, &onuPool)
+
+		Pool.Type = openolt.DeviceInfo_DeviceResourceRanges_Pool_ALLOC_ID
+		Pool.Start = devInfo.AllocIdStart
+		Pool.End = devInfo.AllocIdEnd
+		Pool.Sharing = openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH
+		allocPool := Pool
+		ranges.Pools = append(ranges.Pools, &allocPool)
+
+		Pool.Type = openolt.DeviceInfo_DeviceResourceRanges_Pool_GEMPORT_ID
+		Pool.Start = devInfo.GemportIdStart
+		Pool.End = devInfo.GemportIdEnd
+		Pool.Sharing = openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH
+		gemPool := Pool
+		ranges.Pools = append(ranges.Pools, &gemPool)
+
+		Pool.Type = openolt.DeviceInfo_DeviceResourceRanges_Pool_FLOW_ID
+		Pool.Start = devInfo.FlowIdStart
+		Pool.End = devInfo.FlowIdEnd
+		Pool.Sharing = openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH
+		ranges.Pools = append(ranges.Pools, &Pool)
+		// Add to device info
+		devInfo.Ranges = append(devInfo.Ranges, &ranges)
+	}
+
+	// Create a separate Resource Manager instance for each range. This assumes that
+	// each technology is represented by only a single range
+	var GlobalPONRsrcMgr *ponrmgr.PONResourceManager
+	var err error
+	IPPort := strings.Split(KVStoreHostPort, ":")
+	for _, TechRange := range devInfo.Ranges {
+		technology := TechRange.Technology
+		log.Debugf("Device info technology %s", technology)
+		Ranges[technology] = TechRange
+		port, _ := strconv.Atoi(IPPort[1])
+		RsrcMgrsByTech[technology], err = ponrmgr.NewPONResourceManager(technology, deviceType, deviceID,
+			kvStoreType, IPPort[0], port)
+		if err != nil {
+			log.Errorf("Failed to create pon resource manager instance for technology %s", technology)
+			return nil
+		}
+		// resource_mgrs_by_tech[technology] = resource_mgr
+		if GlobalPONRsrcMgr == nil {
+			GlobalPONRsrcMgr = RsrcMgrsByTech[technology]
+		}
+		for _, IntfID := range TechRange.IntfIds {
+			ResourceMgr.ResourceMgrs[(IntfID)] = RsrcMgrsByTech[technology]
+		}
+		// self.initialize_device_resource_range_and_pool(resource_mgr, global_resource_mgr, arange)
+		InitializeDeviceResourceRangeAndPool(RsrcMgrsByTech[technology], GlobalPONRsrcMgr,
+			TechRange, devInfo)
+	}
+	// After we have initialized resource ranges, initialize the
+	// resource pools accordingly.
+	for _, PONRMgr := range RsrcMgrsByTech {
+		_ = PONRMgr.InitDeviceResourcePool()
+	}
+	log.Info("Initialization of  resource manager success!")
+	return &ResourceMgr
+}
+
+// InitializeDeviceResourceRangeAndPool initializes the resource range pool according to the sharing type, then apply
+// device specific information. If KV doesn't exist
+// or is broader than the device, the device's information will
+// dictate the range limits
+func InitializeDeviceResourceRangeAndPool(ponRMgr *ponrmgr.PONResourceManager, globalPONRMgr *ponrmgr.PONResourceManager,
+	techRange *openolt.DeviceInfo_DeviceResourceRanges, devInfo *openolt.DeviceInfo) {
+
+	// init the resource range pool according to the sharing type
+
+	log.Debugf("Resource range pool init for technology %s", ponRMgr.Technology)
+	// first load from KV profiles
+	status := ponRMgr.InitResourceRangesFromKVStore()
+	if !status {
+		log.Debugf("Failed to load resource ranges from KV store for tech %s", ponRMgr.Technology)
+	}
+
+	/*
+	   Then apply device specific information. If KV doesn't exist
+	   or is broader than the device, the device's information will
+	   dictate the range limits
+	*/
+	log.Debugf("Using device info to init pon resource ranges for tech", ponRMgr.Technology)
+
+	ONUIDStart := devInfo.OnuIdStart
+	ONUIDEnd := devInfo.OnuIdEnd
+	ONUIDShared := openolt.DeviceInfo_DeviceResourceRanges_Pool_DEDICATED_PER_INTF
+	ONUIDSharedPoolID := uint32(0)
+	AllocIDStart := devInfo.AllocIdStart
+	AllocIDEnd := devInfo.AllocIdEnd
+	AllocIDShared := openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH // TODO EdgeCore/BAL limitation
+	AllocIDSharedPoolID := uint32(0)
+	GEMPortIDStart := devInfo.GemportIdStart
+	GEMPortIDEnd := devInfo.GemportIdEnd
+	GEMPortIDShared := openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH // TODO EdgeCore/BAL limitation
+	GEMPortIDSharedPoolID := uint32(0)
+	FlowIDStart := devInfo.FlowIdStart
+	FlowIDEnd := devInfo.FlowIdEnd
+	FlowIDShared := openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH // TODO EdgeCore/BAL limitation
+	FlowIDSharedPoolID := uint32(0)
+
+	var FirstIntfPoolID uint32
+	var SharedPoolID uint32
+
+	/*
+	 * As a zero check is made against SharedPoolID to check whether the resources are shared across all intfs
+	 * if resources are shared across interfaces then SharedPoolID is given a positive number.
+	 */
+	for _, FirstIntfPoolID = range techRange.IntfIds {
+		// skip the intf id 0
+		if FirstIntfPoolID == 0 {
+			continue
+		}
+		break
+	}
+
+	for _, RangePool := range techRange.Pools {
+		if RangePool.Sharing == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
+			SharedPoolID = FirstIntfPoolID
+		} else if RangePool.Sharing == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_SAME_TECH {
+			SharedPoolID = FirstIntfPoolID
+		} else {
+			SharedPoolID = 0
+		}
+		if RangePool.Type == openolt.DeviceInfo_DeviceResourceRanges_Pool_ONU_ID {
+			ONUIDStart = RangePool.Start
+			ONUIDEnd = RangePool.End
+			ONUIDShared = RangePool.Sharing
+			ONUIDSharedPoolID = SharedPoolID
+		} else if RangePool.Type == openolt.DeviceInfo_DeviceResourceRanges_Pool_ALLOC_ID {
+			AllocIDStart = RangePool.Start
+			AllocIDEnd = RangePool.End
+			AllocIDShared = RangePool.Sharing
+			AllocIDSharedPoolID = SharedPoolID
+		} else if RangePool.Type == openolt.DeviceInfo_DeviceResourceRanges_Pool_GEMPORT_ID {
+			GEMPortIDStart = RangePool.Start
+			GEMPortIDEnd = RangePool.End
+			GEMPortIDShared = RangePool.Sharing
+			GEMPortIDSharedPoolID = SharedPoolID
+		} else if RangePool.Type == openolt.DeviceInfo_DeviceResourceRanges_Pool_FLOW_ID {
+			FlowIDStart = RangePool.Start
+			FlowIDEnd = RangePool.End
+			FlowIDShared = RangePool.Sharing
+			FlowIDSharedPoolID = SharedPoolID
+		}
+	}
+
+	log.Debugw("Device info init", log.Fields{"technology": techRange.Technology,
+		"onu_id_start": ONUIDStart, "onu_id_end": ONUIDEnd, "onu_id_shared_pool_id": ONUIDSharedPoolID,
+		"alloc_id_start": AllocIDStart, "alloc_id_end": AllocIDEnd,
+		"alloc_id_shared_pool_id": AllocIDSharedPoolID,
+		"gemport_id_start":        GEMPortIDStart, "gemport_id_end": GEMPortIDEnd,
+		"gemport_id_shared_pool_id": GEMPortIDSharedPoolID,
+		"flow_id_start":             FlowIDStart,
+		"flow_id_end_idx":           FlowIDEnd,
+		"flow_id_shared_pool_id":    FlowIDSharedPoolID,
+		"intf_ids":                  techRange.IntfIds,
+		"uni_id_start":              0,
+		"uni_id_end_idx":            1, /*MaxUNIIDperONU()*/
+	})
+
+	ponRMgr.InitDefaultPONResourceRanges(ONUIDStart, ONUIDEnd, ONUIDSharedPoolID,
+		AllocIDStart, AllocIDEnd, AllocIDSharedPoolID,
+		GEMPortIDStart, GEMPortIDEnd, GEMPortIDSharedPoolID,
+		FlowIDStart, FlowIDEnd, FlowIDSharedPoolID, 0, 1,
+		devInfo.PonPorts, techRange.IntfIds)
+
+	// For global sharing, make sure to refresh both local and global resource manager instances' range
+
+	if ONUIDShared == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
+		globalPONRMgr.UpdateRanges(ponrmgr.ONU_ID_START_IDX, ONUIDStart, ponrmgr.ONU_ID_END_IDX, ONUIDEnd,
+			"", 0, nil)
+		ponRMgr.UpdateRanges(ponrmgr.ONU_ID_START_IDX, ONUIDStart, ponrmgr.ONU_ID_END_IDX, ONUIDEnd,
+			"", 0, globalPONRMgr)
+	}
+	if AllocIDShared == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
+		globalPONRMgr.UpdateRanges(ponrmgr.ALLOC_ID_START_IDX, AllocIDStart, ponrmgr.ALLOC_ID_END_IDX, AllocIDEnd,
+			"", 0, nil)
+
+		ponRMgr.UpdateRanges(ponrmgr.ALLOC_ID_START_IDX, AllocIDStart, ponrmgr.ALLOC_ID_END_IDX, AllocIDEnd,
+			"", 0, globalPONRMgr)
+	}
+	if GEMPortIDShared == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
+		globalPONRMgr.UpdateRanges(ponrmgr.GEMPORT_ID_START_IDX, GEMPortIDStart, ponrmgr.GEMPORT_ID_END_IDX, GEMPortIDEnd,
+			"", 0, nil)
+		ponRMgr.UpdateRanges(ponrmgr.GEMPORT_ID_START_IDX, GEMPortIDStart, ponrmgr.GEMPORT_ID_END_IDX, GEMPortIDEnd,
+			"", 0, globalPONRMgr)
+	}
+	if FlowIDShared == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
+		globalPONRMgr.UpdateRanges(ponrmgr.FLOW_ID_START_IDX, FlowIDStart, ponrmgr.FLOW_ID_END_IDX, FlowIDEnd,
+			"", 0, nil)
+		ponRMgr.UpdateRanges(ponrmgr.FLOW_ID_START_IDX, FlowIDStart, ponrmgr.FLOW_ID_END_IDX, FlowIDEnd,
+			"", 0, globalPONRMgr)
+	}
+
+	// Make sure loaded range fits the platform bit encoding ranges
+	ponRMgr.UpdateRanges(ponrmgr.UNI_ID_START_IDX, 0, ponrmgr.UNI_ID_END_IDX /* TODO =OpenOltPlatform.MAX_UNIS_PER_ONU-1*/, 1, "", 0, nil)
+}
+
+// Delete clears used resources for the particular olt device being deleted
+func (RsrcMgr *OpenOltResourceMgr) Delete() error {
+	/* TODO
+	   def __del__(self):
+	           self.log.info("clearing-device-resource-pool")
+	           for key, resource_mgr in self.resource_mgrs.iteritems():
+	               resource_mgr.clear_device_resource_pool()
+
+	       def assert_pon_id_limit(self, pon_intf_id):
+	           assert pon_intf_id in self.resource_mgrs
+
+	       def assert_onu_id_limit(self, pon_intf_id, onu_id):
+	           self.assert_pon_id_limit(pon_intf_id)
+	           self.resource_mgrs[pon_intf_id].assert_resource_limits(onu_id, PONResourceManager.ONU_ID)
+
+	       @property
+	       def max_uni_id_per_onu(self):
+	           return 0 #OpenOltPlatform.MAX_UNIS_PER_ONU-1, zero-based indexing Uncomment or override to make default multi-uni
+
+	       def assert_uni_id_limit(self, pon_intf_id, onu_id, uni_id):
+	           self.assert_onu_id_limit(pon_intf_id, onu_id)
+	           self.resource_mgrs[pon_intf_id].assert_resource_limits(uni_id, PONResourceManager.UNI_ID)
+	*/
+	for _, rsrcMgr := range RsrcMgr.ResourceMgrs {
+		if err := rsrcMgr.ClearDeviceResourcePool(); err != nil {
+			log.Debug("Failed to clear device resource pool")
+			return err
+		}
+	}
+	log.Debug("Cleared device resource pool")
+	return nil
+}
+
+// GetONUID returns the available OnuID for the given pon-port
+func (RsrcMgr *OpenOltResourceMgr) GetONUID(ponIntfID uint32) (uint32, error) {
+	// Check if Pon Interface ID is present in Resource-manager-map
+	if _, ok := RsrcMgr.ResourceMgrs[ponIntfID]; !ok {
+		err := errors.New("invalid-pon-interface-" + strconv.Itoa(int(ponIntfID)))
+		return 0, err
+	}
+	// Get ONU id for a provided pon interface ID.
+	ONUID, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(ponIntfID,
+		ponrmgr.ONU_ID, 1)
+	if err != nil {
+		log.Errorf("Failed to get resource for interface %d for type %s",
+			ponIntfID, ponrmgr.ONU_ID)
+		return 0, err
+	}
+	if ONUID != nil {
+		RsrcMgr.ResourceMgrs[ponIntfID].InitResourceMap(fmt.Sprintf("%d,%d", ponIntfID, ONUID[0]))
+		return ONUID[0], err
+	}
+
+	return 0, err // return OnuID 0 on error
+}
+
+// GetAllocID return the first Alloc ID for a given pon interface id and onu id and then update the resource map on
+// the KV store with the list of alloc_ids allocated for the pon_intf_onu_id tuple
+// Currently of all the alloc_ids available, it returns the first alloc_id in the list for tha given ONU
+func (RsrcMgr *OpenOltResourceMgr) GetAllocID(intfID uint32, onuID uint32, uniID uint32) uint32 {
+
+	var err error
+	IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
+	AllocID := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(IntfOnuIDUniID)
+	if AllocID != nil {
+		// Since we support only one alloc_id for the ONU at the moment,
+		// return the first alloc_id in the list, if available, for that
+		// ONU.
+		log.Debugw("Retrieved alloc ID from pon resource mgr", log.Fields{"AllocID": AllocID})
+		return AllocID[0]
+	}
+	AllocID, err = RsrcMgr.ResourceMgrs[intfID].GetResourceID(intfID,
+		ponrmgr.ALLOC_ID, 1)
+
+	if AllocID == nil || err != nil {
+		log.Error("Failed to allocate alloc id")
+		return 0
+	}
+	// update the resource map on KV store with the list of alloc_id
+	// allocated for the pon_intf_onu_id tuple
+	err = RsrcMgr.ResourceMgrs[intfID].UpdateAllocIdsForOnu(IntfOnuIDUniID, AllocID)
+	if err != nil {
+		log.Error("Failed to update Alloc ID")
+		return 0
+	}
+	log.Debugw("Allocated new Tcont from pon resource mgr", log.Fields{"AllocID": AllocID})
+	return AllocID[0]
+}
+
+// GetGEMPortID gets gem port id for a particular pon port, onu id and uni id and then update the resource map on
+// the KV store with the list of gemport_id allocated for the pon_intf_onu_id tuple
+func (RsrcMgr *OpenOltResourceMgr) GetGEMPortID(ponPort uint32, onuID uint32,
+	uniID uint32, NumOfPorts uint32) ([]uint32, error) {
+
+	/* Get gem port id for a particular pon port, onu id
+	   and uni id.
+	*/
+
+	var err error
+	IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
+
+	GEMPortList := RsrcMgr.ResourceMgrs[ponPort].GetCurrentGEMPortIDsForOnu(IntfOnuIDUniID)
+	if GEMPortList != nil {
+		return GEMPortList, nil
+	}
+
+	GEMPortList, err = RsrcMgr.ResourceMgrs[ponPort].GetResourceID(ponPort,
+		ponrmgr.GEMPORT_ID, NumOfPorts)
+	if err != nil && GEMPortList == nil {
+		log.Errorf("Failed to get gem port id for %s", IntfOnuIDUniID)
+		return nil, err
+	}
+
+	// update the resource map on KV store with the list of gemport_id
+	// allocated for the pon_intf_onu_id tuple
+	err = RsrcMgr.ResourceMgrs[ponPort].UpdateGEMPortIDsForOnu(IntfOnuIDUniID,
+		GEMPortList)
+	if err != nil {
+		log.Errorf("Failed to update GEM ports to kv store for %s", IntfOnuIDUniID)
+		return nil, err
+	}
+
+	return GEMPortList, err
+}
+
+// FreeFlowID returns the free flow id for a given interface, onu id and uni id
+func (RsrcMgr *OpenOltResourceMgr) FreeFlowID(IntfID uint32, onuID int32,
+	uniID int32, FlowID uint32) {
+	var IntfONUID string
+	var err error
+	FlowIds := make([]uint32, 0)
+
+	FlowIds = append(FlowIds, FlowID)
+	IntfONUID = fmt.Sprintf("%d,%d,%d", IntfID, onuID, uniID)
+	err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(IntfONUID, FlowID, false)
+	if err != nil {
+		log.Errorw("Failed to Update flow id  for", log.Fields{"intf": IntfONUID})
+	}
+	RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(IntfONUID, FlowID)
+	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(IntfID, ponrmgr.FLOW_ID, FlowIds)
+}
+
+// FreeFlowIDs releases the flow Ids
+func (RsrcMgr *OpenOltResourceMgr) FreeFlowIDs(IntfID uint32, onuID uint32,
+	uniID uint32, FlowID []uint32) {
+
+	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(IntfID, ponrmgr.FLOW_ID, FlowID)
+
+	var IntfOnuIDUniID string
+	var err error
+	for _, flow := range FlowID {
+		IntfOnuIDUniID = fmt.Sprintf("%d,%d,%d", IntfID, onuID, uniID)
+		err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(IntfOnuIDUniID, flow, false)
+		if err != nil {
+			log.Errorw("Failed to Update flow id for", log.Fields{"intf": IntfOnuIDUniID})
+		}
+		RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(IntfOnuIDUniID, flow)
+	}
+}
+
+// FreeAllocID frees AllocID on the PON resource pool and also frees the allocID association
+// for the given OLT device.
+func (RsrcMgr *OpenOltResourceMgr) FreeAllocID(IntfID uint32, allocID uint32) {
+	allocIDs := make([]uint32, 0)
+	allocIDs = append(allocIDs, allocID)
+	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(IntfID, ponrmgr.ALLOC_ID, allocIDs)
+}
+
+// FreeGemPortID frees GemPortID on the PON resource pool and also frees the gemPortID association
+// for the given OLT device.
+func (RsrcMgr *OpenOltResourceMgr) FreeGemPortID(IntfID uint32, gemPortID uint32) {
+	gemPortIDs := make([]uint32, 0)
+	gemPortIDs = append(gemPortIDs, gemPortID)
+	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(IntfID, ponrmgr.GEMPORT_ID, gemPortIDs)
+}
diff --git a/core/subscriber_manager.go b/core/subscriber_manager.go
new file mode 100644
index 0000000..717c323
--- /dev/null
+++ b/core/subscriber_manager.go
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package core
+
+import (
+	"fmt"
+	"github.com/opencord/openolt-scale-tester/config"
+	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	"github.com/opencord/voltha-lib-go/v2/pkg/techprofile"
+	oop "github.com/opencord/voltha-protos/v2/go/openolt"
+)
+
+func init() {
+	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
+}
+
+const (
+	SUBSCRIBER_PROVISION_SUCCESS = iota
+	TP_INSTANCE_CREATION_FAILED
+	FLOW_ADD_FAILED
+	SCHED_CREATION_FAILED
+	QUEUE_CREATION_FAILED
+)
+
+const (
+	UniPortName = "pon-{%d}/onu-{%d}/uni-{%d}"
+)
+
+var Reason = [...]string{
+	"SUBSCRIBER_PROVISION_SUCCESS",
+	"TP_INSTANCE_CREATION_FAILED",
+	"FLOW_ADD_FAILED",
+	"SCHED_CREATION_FAILED",
+	"QUEUE_CREATION_FAILED",
+}
+
+func ReasonCodeToReasonString(reasonCode int) string {
+	return Reason[reasonCode]
+}
+
+type Subscriber struct {
+	SubscriberName string   `json:"subscriberName"`
+	OnuID          uint32   `json:"onuID"`
+	UniID          uint32   `json:"uniID"`
+	PonIntf        uint32   `json:"ponIntf"`
+	UniPortNo      uint32   `json:"uniPortNo"`
+	Ctag           uint32   `json:"ctag"`
+	Stag           uint32   `json:"stag"`
+	GemPortIDs     []uint32 `json:"gemPortIds"`
+	AllocIDs       []uint32 `json:"allocIds"`
+	FlowIDs        []uint32 `json:"flowIds"`
+	Reason         string   `json:"reason"`
+
+	FailedFlowCnt  uint32 `json:"failedFlowCnt"`
+	SuccessFlowCnt uint32 `json:"successFlowCnt"`
+
+	FailedSchedCnt  uint32 `json:"failedSchedCnt"`
+	SuccessSchedCnt uint32 `json:"successShedCnt"`
+
+	FailedQueueCnt  uint32 `json:"failedQueueCnt"`
+	SuccessQueueCnt uint32 `json:"successQueueCnt"`
+
+	FailedFlows  []oop.Flow             `json:"failedFlows"`
+	FailedScheds []oop.TrafficScheduler `json:"failedScheds"`
+	FailedQueues []oop.TrafficQueue     `json:"failedQueues"`
+
+	TpInstance    map[int]*techprofile.TechProfile
+	OpenOltClient oop.OpenoltClient
+	TestConfig    *config.OpenOltScaleTesterConfig
+	RsrMgr        *OpenOltResourceMgr
+}
+
+func (subs *Subscriber) Start(onuCh chan bool) {
+
+	log.Infow("workflow-deploy-started-for-subscriber", log.Fields{"subsName": subs.SubscriberName})
+
+	subs.TpInstance = make(map[int]*techprofile.TechProfile)
+
+	for _, tpID := range subs.TestConfig.TpIDList {
+		uniPortName := fmt.Sprintf(UniPortName, subs.PonIntf, subs.OnuID, subs.UniID)
+		if subs.TpInstance[tpID] =
+			subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.CreateTechProfInstance(
+				uint32(tpID), uniPortName, subs.PonIntf); subs.TpInstance[tpID] == nil {
+			log.Errorw("error-creating-tp-instance-for-subs",
+				log.Fields{"subsName": subs.SubscriberName, "onuID": subs.OnuID, "tpID": tpID})
+
+			subs.Reason = ReasonCodeToReasonString(TP_INSTANCE_CREATION_FAILED)
+			onuCh <- true
+
+			return
+		}
+	}
+
+	DeployWorkflow(subs)
+
+	log.Infow("workflow-deploy-completed-for-subscriber", log.Fields{"subsName": subs.SubscriberName})
+
+	onuCh <- true
+}
diff --git a/core/utils.go b/core/utils.go
new file mode 100644
index 0000000..01ff500
--- /dev/null
+++ b/core/utils.go
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package core
+
+import (
+	"fmt"
+	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	"github.com/opencord/voltha-protos/v2/go/openolt"
+)
+
+var AttCtag map[uint32]uint32
+
+func init() {
+	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
+	AttCtag = make(map[uint32]uint32)
+}
+
+const (
+	vendorName = "ABCD"
+	// Number of bits for the physical UNI of the ONUs
+	bitsForUniID = 4
+	// Number of bits for the ONU ID
+	bitsForONUID = 8
+	//MaxOnusPerPon is Max number of ONUs on any PON port
+	MaxOnusPerPon = 1 << bitsForONUID
+)
+
+var vendorSpecificId = 1000
+
+func GenerateNextONUSerialNumber() *openolt.SerialNumber {
+
+	vi := []byte(vendorName)
+
+	vendorSpecificId += 1
+	vs := []byte(fmt.Sprint(vendorSpecificId))
+	// log.Infow("vendor-id-and-vendor-specific", log.Fields{"vi":vi, "vs":vs})
+	sn := &openolt.SerialNumber{VendorId: vi, VendorSpecific: vs}
+	// log.Infow("serial-num", log.Fields{"sn":sn})
+
+	return sn
+}
+
+//MkUniPortNum returns new UNIportNum based on intfID, inuID and uniID
+func MkUniPortNum(intfID, onuID, uniID uint32) uint32 {
+	var limit = int(onuID)
+	if limit > MaxOnusPerPon {
+		log.Warn("Warning: exceeded the MAX ONUS per PON")
+	}
+	return (intfID << (bitsForUniID + bitsForONUID)) | (onuID << bitsForUniID) | uniID
+}
+
+func GetAttCtag(ponIntf uint32) uint32 {
+	var currCtag uint32
+	var ok bool
+	if currCtag, ok = AttCtag[ponIntf]; !ok {
+		// Start with ctag 2
+		AttCtag[ponIntf] = 2
+		return AttCtag[ponIntf]
+	}
+	AttCtag[ponIntf] = currCtag + 1
+	return AttCtag[ponIntf]
+}
+
+func GetAttStag(ponIntf uint32) uint32 {
+	// start with stag 2
+	return ponIntf + 2
+}
+
+// TODO: More workflow support to be added here
+func GetCtag(workFlowName string, ponIntf uint32) uint32 {
+	switch workFlowName {
+	case "ATT":
+		return GetAttCtag(ponIntf)
+	default:
+		log.Errorw("unknown-workflowname", log.Fields{"workflow": workFlowName})
+	}
+	return 0
+}
+
+func GetStag(workFlowName string, ponIntf uint32) uint32 {
+	switch workFlowName {
+	case "ATT":
+		return GetAttStag(ponIntf)
+	default:
+		log.Errorw("unknown-workflowname", log.Fields{"workflow": workFlowName})
+	}
+	return 0
+}
diff --git a/core/workflow_manager.go b/core/workflow_manager.go
new file mode 100644
index 0000000..b8674ac
--- /dev/null
+++ b/core/workflow_manager.go
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package core
+
+import (
+	"errors"
+	"github.com/opencord/openolt-scale-tester/config"
+	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	oop "github.com/opencord/voltha-protos/v2/go/openolt"
+)
+
+func init() {
+	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
+}
+
+type WorkFlow interface {
+	ProvisionScheds(subs *Subscriber) error
+	ProvisionQueues(subs *Subscriber) error
+	ProvisionEapFlow(subs *Subscriber) error
+	ProvisionDhcpFlow(subs *Subscriber) error
+	ProvisionIgmpFlow(subs *Subscriber) error
+	ProvisionHsiaFlow(subs *Subscriber) error
+	// TODO: Add new items here as needed.
+}
+
+func DeployWorkflow(subs *Subscriber) {
+	var wf = getWorkFlow(subs)
+
+	// TODO: Catch and log errors for below items if needed.
+	if err := wf.ProvisionScheds(subs); err != nil {
+		subs.Reason = err.Error()
+		return
+	}
+
+	if err := wf.ProvisionQueues(subs); err != nil {
+		subs.Reason = err.Error()
+		return
+	}
+
+	if err := wf.ProvisionEapFlow(subs); err != nil {
+		subs.Reason = err.Error()
+		return
+	}
+
+	if err := wf.ProvisionDhcpFlow(subs); err != nil {
+		subs.Reason = err.Error()
+		return
+	}
+
+	if err := wf.ProvisionIgmpFlow(subs); err != nil {
+		subs.Reason = err.Error()
+		return
+	}
+
+	if err := wf.ProvisionHsiaFlow(subs); err != nil {
+		subs.Reason = err.Error()
+		return
+	}
+
+	subs.Reason = ReasonCodeToReasonString(SUBSCRIBER_PROVISION_SUCCESS)
+}
+
+func getWorkFlow(subs *Subscriber) WorkFlow {
+	switch subs.TestConfig.WorkflowName {
+	case "ATT":
+		log.Info("chosen-att-workflow")
+		return AttWorkFlow{}
+	// TODO: Add new workflow here
+	default:
+		log.Errorw("operator-workflow-not-supported-yet", log.Fields{"workflowName": subs.TestConfig.WorkflowName})
+	}
+	return nil
+}
+
+// This function should get called even before provisioning an ONUs to install trap-from-nni flows.
+// The flows installed here are not related to any subscribers.
+func ProvisionNniTrapFlow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig, rsrMgr *OpenOltResourceMgr) error {
+	switch config.WorkflowName {
+	case "ATT":
+		if err := ProvisionAttNniTrapFlow(oo, config, rsrMgr); err != nil {
+			log.Error("error-installing-flow", log.Fields{"err": err})
+			return err
+		}
+	// TODO: Add new items here
+	default:
+		log.Errorw("operator-workflow-not-supported-yet", log.Fields{"workflowName": config.WorkflowName})
+		return errors.New("workflow-not-supported")
+	}
+	return nil
+}