[VOL-2588] Simplify TT case

Change-Id: Ia22dbda21b0702ac0444a17ae3e5063c7723e395
diff --git a/core/att_workflow.go b/core/att_workflow.go
index 947a38d..cf5cbae 100644
--- a/core/att_workflow.go
+++ b/core/att_workflow.go
@@ -21,10 +21,10 @@
 	"strings"
 
 	"github.com/opencord/openolt-scale-tester/config"
-	"github.com/opencord/voltha-lib-go/v2/pkg/log"
-	"github.com/opencord/voltha-lib-go/v2/pkg/ponresourcemanager"
-	oop "github.com/opencord/voltha-protos/v2/go/openolt"
-	tp_pb "github.com/opencord/voltha-protos/v2/go/tech_profile"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
+	oop "github.com/opencord/voltha-protos/v3/go/openolt"
+	tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -42,7 +42,7 @@
 	var flowID []uint32
 	var err error
 
-	if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(uint32(config.NniIntfID),
+	if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(context.Background(), uint32(config.NniIntfID),
 		ponresourcemanager.FLOW_ID, 1); err != nil {
 		return err
 	}
@@ -67,7 +67,7 @@
 
 	if err != nil {
 		log.Errorw("Failed to Add DHCP IPv4 to device", log.Fields{"err": err, "deviceFlow": flow})
-		rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(uint32(config.NniIntfID),
+		rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(context.Background(), uint32(config.NniIntfID),
 			ponresourcemanager.FLOW_ID, flowID)
 		return err
 	}
@@ -80,7 +80,7 @@
 	var flowID []uint32
 	var err error
 
-	if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(uint32(config.NniIntfID),
+	if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(context.Background(), uint32(config.NniIntfID),
 		ponresourcemanager.FLOW_ID, 1); err != nil {
 		return err
 	}
@@ -105,7 +105,7 @@
 
 	if err != nil {
 		log.Errorw("Failed to Add DHCP IPV6 to device", log.Fields{"err": err, "deviceFlow": flow})
-		rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(uint32(config.NniIntfID),
+		rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(context.Background(), uint32(config.NniIntfID),
 			ponresourcemanager.FLOW_ID, flowID)
 		return err
 	}
@@ -216,12 +216,12 @@
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
 				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
+				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
 					ponresourcemanager.FLOW_ID, 1); err != nil {
 					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
 				} else {
 					if err := AddFlow(subs, EapolFlow, Upstream, flowID[0], allocID, gemID, pcp); err != nil {
-						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
+						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
 							ponresourcemanager.FLOW_ID, flowID)
 						return err
 					}
@@ -247,12 +247,12 @@
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
 				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
+				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
 					ponresourcemanager.FLOW_ID, 1); err != nil {
 					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
 				} else {
 					if err := AddFlow(subs, DhcpFlowIPV4, Upstream, flowID[0], allocID, gemID, pcp); err != nil {
-						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
+						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
 							ponresourcemanager.FLOW_ID, flowID)
 						return err
 					}
@@ -278,12 +278,12 @@
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
 				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
+				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
 					ponresourcemanager.FLOW_ID, 1); err != nil {
 					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
 				} else {
 					if err := AddFlow(subs, DhcpFlowIPV6, Upstream, flowID[0], allocID, gemID, pcp); err != nil {
-						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
+						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
 							ponresourcemanager.FLOW_ID, flowID)
 						return err
 					}
@@ -314,7 +314,7 @@
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
 				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
+				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
 					ponresourcemanager.FLOW_ID, 1); err != nil {
 					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
 				} else {
@@ -328,7 +328,7 @@
 							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
 					}
 					if errUs != nil && errDs != nil {
-						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
+						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
 							ponresourcemanager.FLOW_ID, flowID)
 					}
 					if errUs != nil || errDs != nil {
@@ -343,3 +343,23 @@
 	}
 	return nil
 }
+
+func (att AttWorkFlow) ProvisionVoipFlow(subs *Subscriber) error {
+	log.Info("att-workflow-does-not-support-voip-yet--nothing-to-do")
+	return nil
+}
+
+func (att AttWorkFlow) ProvisionVodFlow(subs *Subscriber) error {
+	log.Info("att-workflow-does-not-support-vod-yet--nothing-to-do")
+	return nil
+}
+
+func (att AttWorkFlow) ProvisionMgmtFlow(subs *Subscriber) error {
+	log.Info("att-workflow-does-not-support-mgmt-yet--nothing-to-do")
+	return nil
+}
+
+func (att AttWorkFlow) ProvisionMulticastFlow(subs *Subscriber) error {
+	log.Info("att-workflow-does-not-support-multicast-yet--nothing-to-do")
+	return nil
+}
diff --git a/core/dt_workflow.go b/core/dt_workflow.go
index b2ad300..a06b29b 100644
--- a/core/dt_workflow.go
+++ b/core/dt_workflow.go
@@ -21,10 +21,10 @@
 	"strings"
 
 	"github.com/opencord/openolt-scale-tester/config"
-	"github.com/opencord/voltha-lib-go/v2/pkg/log"
-	"github.com/opencord/voltha-lib-go/v2/pkg/ponresourcemanager"
-	oop "github.com/opencord/voltha-protos/v2/go/openolt"
-	tp_pb "github.com/opencord/voltha-protos/v2/go/tech_profile"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
+	oop "github.com/opencord/voltha-protos/v3/go/openolt"
+	tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
 	"golang.org/x/net/context"
 )
 
@@ -156,12 +156,12 @@
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
 				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
+				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
 					ponresourcemanager.FLOW_ID, 1); err != nil {
 					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
 				} else {
 					if err := AddFlow(subs, HsiaFlow, Upstream, flowID[0], allocID, gemID, pcp); err != nil {
-						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
+						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
 							ponresourcemanager.FLOW_ID, flowID)
 						return err
 					}
@@ -174,3 +174,23 @@
 	}
 	return nil
 }
+
+func (dt DtWorkFlow) ProvisionVoipFlow(subs *Subscriber) error {
+	log.Info("dt-workflow-does-not-support-voip-yet--nothing-to-do")
+	return nil
+}
+
+func (dt DtWorkFlow) ProvisionVodFlow(subs *Subscriber) error {
+	log.Info("dt-workflow-does-not-support-vod-yet--nothing-to-do")
+	return nil
+}
+
+func (dt DtWorkFlow) ProvisionMgmtFlow(subs *Subscriber) error {
+	log.Info("dt-workflow-does-not-support-mgmt-yet--nothing-to-do")
+	return nil
+}
+
+func (dt DtWorkFlow) ProvisionMulticastFlow(subs *Subscriber) error {
+	log.Info("dt-workflow-does-not-support-multicast-yet--nothing-to-do")
+	return nil
+}
diff --git a/core/olt_manager.go b/core/olt_manager.go
index 6d18b7e..78dd97e 100644
--- a/core/olt_manager.go
+++ b/core/olt_manager.go
@@ -22,15 +22,6 @@
 	"encoding/json"
 	"errors"
 	"fmt"
-	"github.com/cenkalti/backoff/v3"
-	"github.com/opencord/openolt-scale-tester/config"
-	"github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
-	"github.com/opencord/voltha-lib-go/v2/pkg/log"
-	"github.com/opencord/voltha-lib-go/v2/pkg/techprofile"
-	oop "github.com/opencord/voltha-protos/v2/go/openolt"
-	"google.golang.org/grpc"
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
 	"io"
 	"io/ioutil"
 	"os"
@@ -38,6 +29,16 @@
 	"sync"
 	"syscall"
 	"time"
+
+	"github.com/cenkalti/backoff/v3"
+	"github.com/opencord/openolt-scale-tester/config"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-lib-go/v3/pkg/techprofile"
+	oop "github.com/opencord/voltha-protos/v3/go/openolt"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 )
 
 const (
@@ -114,12 +115,12 @@
 		}
 		kvPath := fmt.Sprintf(TechProfileKVPath, om.deviceInfo.Technology, tpID)
 		tpJson, err := json.Marshal(tp)
-		err = client.Put(kvPath, tpJson, 2)
+		err = client.Put(context.Background(), kvPath, tpJson)
 		if err != nil {
 			log.Fatalw("tp-put-to-etcd-failed", log.Fields{"tpPath": kvPath, "err": err})
 		}
 		// verify the PUT succeeded.
-		kvResult, err := client.Get(kvPath, 2)
+		kvResult, err := client.Get(context.Background(), kvPath)
 		if kvResult == nil {
 			log.Fatal("tp-not-found-on-kv-after-load", log.Fields{"key": kvPath, "err": err})
 		} else {
@@ -406,7 +407,7 @@
 		log.Debugw("Received Omci indication ", log.Fields{"IntfId": omciInd.IntfId, "OnuId": omciInd.OnuId, "pkt": hex.EncodeToString(omciInd.Pkt)})
 	case *oop.Indication_PktInd:
 		pktInd := indication.GetPktInd()
-		log.Infow("Received pakcet indication ", log.Fields{"PktInd": pktInd})
+		log.Infow("Received packet indication ", log.Fields{"PktInd": pktInd})
 		/*
 				case *oop.Indication_PortStats:
 				portStats := indication.GetPortStats()
diff --git a/core/onu_manager.go b/core/onu_manager.go
index bd1265c..6fa9201 100644
--- a/core/onu_manager.go
+++ b/core/onu_manager.go
@@ -17,11 +17,12 @@
 package core
 
 import (
-	"github.com/opencord/openolt-scale-tester/config"
-	"github.com/opencord/voltha-lib-go/v2/pkg/log"
-	oop "github.com/opencord/voltha-protos/v2/go/openolt"
 	"strconv"
 	"time"
+
+	"github.com/opencord/openolt-scale-tester/config"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	oop "github.com/opencord/voltha-protos/v3/go/openolt"
 )
 
 func init() {
@@ -72,7 +73,7 @@
 
 		log.Infow("subscriber-provision-started-from-onu-manager", log.Fields{"subsName": subsName})
 		// Start provisioning the subscriber
-		go subs.Start(onuCh)
+		go subs.Start(onuCh, onu.testConfig.IsGroupTest)
 
 		// Wait for subscriber provision to complete
 		<-onuCh
diff --git a/core/resource_manager.go b/core/resource_manager.go
index 1bb5aaf..3be01bc 100644
--- a/core/resource_manager.go
+++ b/core/resource_manager.go
@@ -23,9 +23,10 @@
 	"strconv"
 	"strings"
 
-	"github.com/opencord/voltha-lib-go/v2/pkg/log"
-	ponrmgr "github.com/opencord/voltha-lib-go/v2/pkg/ponresourcemanager"
-	"github.com/opencord/voltha-protos/v2/go/openolt"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	ponrmgr "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
+	"golang.org/x/net/context"
 )
 
 func init() {
@@ -130,7 +131,7 @@
 	// After we have initialized resource ranges, initialize the
 	// resource pools accordingly.
 	for _, PONRMgr := range RsrcMgrsByTech {
-		_ = PONRMgr.InitDeviceResourcePool()
+		_ = PONRMgr.InitDeviceResourcePool(context.Background())
 	}
 	log.Info("Initialization of  resource manager success!")
 	return &ResourceMgr
@@ -147,7 +148,7 @@
 
 	log.Debugf("Resource range pool init for technology %s", ponRMgr.Technology)
 	// first load from KV profiles
-	status := ponRMgr.InitResourceRangesFromKVStore()
+	status := ponRMgr.InitResourceRangesFromKVStore(context.Background())
 	if !status {
 		log.Debugf("Failed to load resource ranges from KV store for tech %s", ponRMgr.Technology)
 	}
@@ -298,7 +299,7 @@
 	           self.resource_mgrs[pon_intf_id].assert_resource_limits(uni_id, PONResourceManager.UNI_ID)
 	*/
 	for _, rsrcMgr := range RsrcMgr.ResourceMgrs {
-		if err := rsrcMgr.ClearDeviceResourcePool(); err != nil {
+		if err := rsrcMgr.ClearDeviceResourcePool(context.Background()); err != nil {
 			log.Debug("Failed to clear device resource pool")
 			return err
 		}
@@ -315,7 +316,7 @@
 		return 0, err
 	}
 	// Get ONU id for a provided pon interface ID.
-	ONUID, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(ponIntfID,
+	ONUID, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(context.Background(), ponIntfID,
 		ponrmgr.ONU_ID, 1)
 	if err != nil {
 		log.Errorf("Failed to get resource for interface %d for type %s",
@@ -323,7 +324,7 @@
 		return 0, err
 	}
 	if ONUID != nil {
-		RsrcMgr.ResourceMgrs[ponIntfID].InitResourceMap(fmt.Sprintf("%d,%d", ponIntfID, ONUID[0]))
+		RsrcMgr.ResourceMgrs[ponIntfID].InitResourceMap(context.Background(), fmt.Sprintf("%d,%d", ponIntfID, ONUID[0]))
 		return ONUID[0], err
 	}
 
@@ -337,7 +338,7 @@
 
 	var err error
 	IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
-	AllocID := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(IntfOnuIDUniID)
+	AllocID := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(context.Background(), IntfOnuIDUniID)
 	if AllocID != nil {
 		// Since we support only one alloc_id for the ONU at the moment,
 		// return the first alloc_id in the list, if available, for that
@@ -345,7 +346,7 @@
 		log.Debugw("Retrieved alloc ID from pon resource mgr", log.Fields{"AllocID": AllocID})
 		return AllocID[0]
 	}
-	AllocID, err = RsrcMgr.ResourceMgrs[intfID].GetResourceID(intfID,
+	AllocID, err = RsrcMgr.ResourceMgrs[intfID].GetResourceID(context.Background(), intfID,
 		ponrmgr.ALLOC_ID, 1)
 
 	if AllocID == nil || err != nil {
@@ -354,7 +355,7 @@
 	}
 	// update the resource map on KV store with the list of alloc_id
 	// allocated for the pon_intf_onu_id tuple
-	err = RsrcMgr.ResourceMgrs[intfID].UpdateAllocIdsForOnu(IntfOnuIDUniID, AllocID)
+	err = RsrcMgr.ResourceMgrs[intfID].UpdateAllocIdsForOnu(context.Background(), IntfOnuIDUniID, AllocID)
 	if err != nil {
 		log.Error("Failed to update Alloc ID")
 		return 0
@@ -375,12 +376,12 @@
 	var err error
 	IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
 
-	GEMPortList := RsrcMgr.ResourceMgrs[ponPort].GetCurrentGEMPortIDsForOnu(IntfOnuIDUniID)
+	GEMPortList := RsrcMgr.ResourceMgrs[ponPort].GetCurrentGEMPortIDsForOnu(context.Background(), IntfOnuIDUniID)
 	if GEMPortList != nil {
 		return GEMPortList, nil
 	}
 
-	GEMPortList, err = RsrcMgr.ResourceMgrs[ponPort].GetResourceID(ponPort,
+	GEMPortList, err = RsrcMgr.ResourceMgrs[ponPort].GetResourceID(context.Background(), ponPort,
 		ponrmgr.GEMPORT_ID, NumOfPorts)
 	if err != nil && GEMPortList == nil {
 		log.Errorf("Failed to get gem port id for %s", IntfOnuIDUniID)
@@ -389,7 +390,7 @@
 
 	// update the resource map on KV store with the list of gemport_id
 	// allocated for the pon_intf_onu_id tuple
-	err = RsrcMgr.ResourceMgrs[ponPort].UpdateGEMPortIDsForOnu(IntfOnuIDUniID,
+	err = RsrcMgr.ResourceMgrs[ponPort].UpdateGEMPortIDsForOnu(context.Background(), IntfOnuIDUniID,
 		GEMPortList)
 	if err != nil {
 		log.Errorf("Failed to update GEM ports to kv store for %s", IntfOnuIDUniID)
@@ -408,29 +409,29 @@
 
 	FlowIds = append(FlowIds, FlowID)
 	IntfONUID = fmt.Sprintf("%d,%d,%d", IntfID, onuID, uniID)
-	err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(IntfONUID, FlowID, false)
+	err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(context.Background(), IntfONUID, FlowID, false)
 	if err != nil {
 		log.Errorw("Failed to Update flow id  for", log.Fields{"intf": IntfONUID})
 	}
-	RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(IntfONUID, FlowID)
-	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(IntfID, ponrmgr.FLOW_ID, FlowIds)
+	RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(context.Background(), IntfONUID, FlowID)
+	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(context.Background(), IntfID, ponrmgr.FLOW_ID, FlowIds)
 }
 
 // FreeFlowIDs releases the flow Ids
 func (RsrcMgr *OpenOltResourceMgr) FreeFlowIDs(IntfID uint32, onuID uint32,
 	uniID uint32, FlowID []uint32) {
 
-	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(IntfID, ponrmgr.FLOW_ID, FlowID)
+	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(context.Background(), IntfID, ponrmgr.FLOW_ID, FlowID)
 
 	var IntfOnuIDUniID string
 	var err error
 	for _, flow := range FlowID {
 		IntfOnuIDUniID = fmt.Sprintf("%d,%d,%d", IntfID, onuID, uniID)
-		err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(IntfOnuIDUniID, flow, false)
+		err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(context.Background(), IntfOnuIDUniID, flow, false)
 		if err != nil {
 			log.Errorw("Failed to Update flow id for", log.Fields{"intf": IntfOnuIDUniID})
 		}
-		RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(IntfOnuIDUniID, flow)
+		RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(context.Background(), IntfOnuIDUniID, flow)
 	}
 }
 
@@ -439,7 +440,7 @@
 func (RsrcMgr *OpenOltResourceMgr) FreeAllocID(IntfID uint32, allocID uint32) {
 	allocIDs := make([]uint32, 0)
 	allocIDs = append(allocIDs, allocID)
-	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(IntfID, ponrmgr.ALLOC_ID, allocIDs)
+	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(context.Background(), IntfID, ponrmgr.ALLOC_ID, allocIDs)
 }
 
 // FreeGemPortID frees GemPortID on the PON resource pool and also frees the gemPortID association
@@ -447,5 +448,5 @@
 func (RsrcMgr *OpenOltResourceMgr) FreeGemPortID(IntfID uint32, gemPortID uint32) {
 	gemPortIDs := make([]uint32, 0)
 	gemPortIDs = append(gemPortIDs, gemPortID)
-	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(IntfID, ponrmgr.GEMPORT_ID, gemPortIDs)
+	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(context.Background(), IntfID, ponrmgr.GEMPORT_ID, gemPortIDs)
 }
diff --git a/core/subscriber_manager.go b/core/subscriber_manager.go
index 5610d2c..d84fad9 100644
--- a/core/subscriber_manager.go
+++ b/core/subscriber_manager.go
@@ -18,10 +18,12 @@
 
 import (
 	"fmt"
+
 	"github.com/opencord/openolt-scale-tester/config"
-	"github.com/opencord/voltha-lib-go/v2/pkg/log"
-	"github.com/opencord/voltha-lib-go/v2/pkg/techprofile"
-	oop "github.com/opencord/voltha-protos/v2/go/openolt"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-lib-go/v3/pkg/techprofile"
+	oop "github.com/opencord/voltha-protos/v3/go/openolt"
+	"golang.org/x/net/context"
 )
 
 func init() {
@@ -86,7 +88,8 @@
 	RsrMgr        *OpenOltResourceMgr
 }
 
-func (subs *Subscriber) Start(onuCh chan bool) {
+func (subs *Subscriber) Start(onuCh chan bool, isGroup bool) {
+	var err error
 
 	log.Infow("workflow-deploy-started-for-subscriber", log.Fields{"subsName": subs.SubscriberName})
 
@@ -94,9 +97,9 @@
 
 	for _, tpID := range subs.TestConfig.TpIDList {
 		uniPortName := fmt.Sprintf(UniPortName, subs.PonIntf, subs.OnuID, subs.UniID)
-		if subs.TpInstance[tpID] =
-			subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.CreateTechProfInstance(
-				uint32(tpID), uniPortName, subs.PonIntf); subs.TpInstance[tpID] == nil {
+		if subs.TpInstance[tpID], err =
+			subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.CreateTechProfInstance(context.Background(),
+				uint32(tpID), uniPortName, subs.PonIntf); err != nil {
 			log.Errorw("error-creating-tp-instance-for-subs",
 				log.Fields{"subsName": subs.SubscriberName, "onuID": subs.OnuID, "tpID": tpID})
 
@@ -107,7 +110,7 @@
 		}
 	}
 
-	DeployWorkflow(subs)
+	DeployWorkflow(subs, isGroup)
 
 	log.Infow("workflow-deploy-completed-for-subscriber", log.Fields{"subsName": subs.SubscriberName})
 
diff --git a/core/tt_workflow.go b/core/tt_workflow.go
new file mode 100644
index 0000000..862da20
--- /dev/null
+++ b/core/tt_workflow.go
@@ -0,0 +1,588 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package core
+
+import (
+	"errors"
+	"strings"
+	"sync/atomic"
+
+	"github.com/opencord/openolt-scale-tester/config"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
+	oop "github.com/opencord/voltha-protos/v3/go/openolt"
+	tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
+	"golang.org/x/net/context"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+var lastPonIntf *uint32 = new(uint32)
+
+func init() {
+	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
+}
+
+// A dummy struct to comply with the WorkFlow interface.
+type TtWorkFlow struct {
+}
+
+func AddTtDhcpIPV4Flow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig, rsrMgr *OpenOltResourceMgr) error {
+	var flowID []uint32
+	var err error
+
+	if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(context.Background(), uint32(config.NniIntfID),
+		ponresourcemanager.FLOW_ID, 1); err != nil {
+		return err
+	}
+
+	// DHCP IPV4
+	flowClassifier := &oop.Classifier{EthType: 2048, IpProto: 17, SrcPort: 67, DstPort: 68, PktTagType: "double_tag"}
+	actionCmd := &oop.ActionCmd{TrapToHost: true}
+	actionInfo := &oop.Action{Cmd: actionCmd}
+
+	flow := oop.Flow{AccessIntfId: -1, OnuId: -1, UniId: -1, FlowId: flowID[0],
+		FlowType: "downstream", AllocId: -1, GemportId: -1,
+		Classifier: flowClassifier, Action: actionInfo,
+		Priority: 1000, PortNo: uint32(config.NniIntfID)}
+
+	_, err = oo.FlowAdd(context.Background(), &flow)
+
+	st, _ := status.FromError(err)
+	if st.Code() == codes.AlreadyExists {
+		log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+		return nil
+	}
+
+	if err != nil {
+		log.Errorw("Failed to Add DHCP IPv4 to device", log.Fields{"err": err, "deviceFlow": flow})
+		rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(context.Background(), uint32(config.NniIntfID),
+			ponresourcemanager.FLOW_ID, flowID)
+		return err
+	}
+	log.Debugw("DHCP IPV4 added to device successfully ", log.Fields{"flow": flow})
+
+	return nil
+}
+
+func AddTtDhcpIPV6Flow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig, rsrMgr *OpenOltResourceMgr) error {
+	log.Info("tt-workflow-does-not-require-dhcp-ipv6-support--nothing-to-do")
+	return nil
+}
+
+func ProvisionTtNniTrapFlow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig, rsrMgr *OpenOltResourceMgr) error {
+	_ = AddTtDhcpIPV4Flow(oo, config, rsrMgr)
+	return nil
+}
+
+func FormatTtClassfierAction(flowType string, direction string, subs *Subscriber) (oop.Classifier, oop.Action) {
+	var flowClassifier oop.Classifier
+	var actionCmd oop.ActionCmd
+	var actionInfo oop.Action
+
+	if direction == Upstream {
+		switch flowType {
+		case IgmpFlow:
+			flowClassifier.EthType = IPv4EthType
+			flowClassifier.IpProto = IgmpProto
+			flowClassifier.SrcPort = 0
+			flowClassifier.DstPort = 0
+			flowClassifier.PktTagType = SingleTag
+			actionCmd.TrapToHost = true
+			actionInfo.Cmd = &actionCmd
+		case HsiaFlow:
+			actionCmd.AddOuterTag = true
+			actionInfo.Cmd = &actionCmd
+			actionInfo.IVid = 33
+			actionInfo.OVid = 7
+			flowClassifier.IPbits = 255
+			flowClassifier.OVid = 33
+			flowClassifier.PktTagType = SingleTag
+		case VoipFlow:
+			actionCmd.AddOuterTag = true
+			actionInfo.Cmd = &actionCmd
+			actionInfo.OPbits = 7
+			actionInfo.IVid = 63
+			actionInfo.OVid = 10
+			flowClassifier.IPbits = 255
+			flowClassifier.OPbits = 7
+			flowClassifier.OVid = 63
+			flowClassifier.PktTagType = SingleTag
+		case VodFlow:
+			actionCmd.AddOuterTag = true
+			actionInfo.Cmd = &actionCmd
+			actionInfo.OPbits = 5
+			actionInfo.IVid = 55
+			actionInfo.OVid = 555
+			flowClassifier.IPbits = 255
+			flowClassifier.OPbits = 5
+			flowClassifier.OVid = 55
+			flowClassifier.PktTagType = SingleTag
+		case MgmtFlow:
+			actionCmd.AddOuterTag = true
+			actionInfo.Cmd = &actionCmd
+			actionInfo.OPbits = 7
+			actionInfo.IVid = 75
+			actionInfo.OVid = 575
+			flowClassifier.IPbits = 255
+			flowClassifier.OPbits = 7
+			flowClassifier.OVid = 75
+			flowClassifier.PktTagType = SingleTag
+		default:
+			log.Errorw("Unsupported TT flow type", log.Fields{"flowtype": flowType,
+				"direction": direction})
+		}
+	} else if direction == Downstream {
+		switch flowType {
+		case IgmpFlow:
+			log.Errorw("Downstream IGMP flows are not required instead we have "+
+				"IGMP trap flows already installed", log.Fields{"flowtype": flowType,
+				"direction": direction})
+		case HsiaFlow:
+			actionCmd.RemoveOuterTag = true
+			actionInfo.Cmd = &actionCmd
+			actionInfo.IVid = 33
+			flowClassifier.IPbits = 255
+			flowClassifier.OPbits = 255
+			flowClassifier.IVid = 33
+			flowClassifier.OVid = 7
+			flowClassifier.PktTagType = DoubleTag
+		case VoipFlow:
+			actionCmd.RemoveOuterTag = true
+			actionInfo.Cmd = &actionCmd
+			actionInfo.IPbits = 7
+			actionInfo.IVid = 63
+			flowClassifier.IPbits = 255
+			flowClassifier.OPbits = 255
+			flowClassifier.IVid = 63
+			flowClassifier.OVid = 10
+			flowClassifier.DstMac = GenerateMac(true)
+			flowClassifier.PktTagType = DoubleTag
+		case VodFlow:
+			actionCmd.RemoveOuterTag = true
+			actionInfo.Cmd = &actionCmd
+			actionInfo.IPbits = 5
+			actionInfo.IVid = 55
+			flowClassifier.IPbits = 255
+			flowClassifier.OPbits = 255
+			flowClassifier.IVid = 55
+			flowClassifier.OVid = 555
+			flowClassifier.DstMac = GenerateMac(true)
+			flowClassifier.PktTagType = DoubleTag
+		case MgmtFlow:
+			actionCmd.RemoveOuterTag = true
+			actionInfo.Cmd = &actionCmd
+			actionInfo.IPbits = 7
+			actionInfo.IVid = 75
+			flowClassifier.IPbits = 255
+			flowClassifier.OPbits = 255
+			flowClassifier.IVid = 75
+			flowClassifier.OVid = 575
+			flowClassifier.DstMac = GenerateMac(true)
+			flowClassifier.PktTagType = DoubleTag
+		default:
+			log.Errorw("Unsupported TT flow type", log.Fields{"flowtype": flowType,
+				"direction": direction})
+		}
+	}
+	return flowClassifier, actionInfo
+}
+
+func AddTtFlow(subs *Subscriber, flowType string, direction string, flowID uint32,
+	allocID uint32, gemID uint32, pcp uint32) error {
+	log.Infow("add-flow", log.Fields{"WorkFlow": subs.TestConfig.WorkflowName, "FlowType": flowType,
+		"direction": direction, "flowID": flowID})
+	var err error
+
+	flowClassifier, actionInfo := FormatTtClassfierAction(flowType, direction, subs)
+	// Update the o_pbit for which this flow has to be classified
+	flowClassifier.OPbits = pcp
+	flow := oop.Flow{AccessIntfId: int32(subs.PonIntf), OnuId: int32(subs.OnuID),
+		UniId: int32(subs.UniID), FlowId: flowID,
+		FlowType: direction, AllocId: int32(allocID), GemportId: int32(gemID),
+		Classifier: &flowClassifier, Action: &actionInfo,
+		Priority: 1000, PortNo: subs.UniPortNo}
+
+	_, err = subs.OpenOltClient.FlowAdd(context.Background(), &flow)
+
+	st, _ := status.FromError(err)
+	if st.Code() == codes.AlreadyExists {
+		log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+		return nil
+	}
+
+	if err != nil {
+		log.Errorw("Failed to Add flow to device", log.Fields{"err": err, "deviceFlow": flow})
+		return errors.New(ReasonCodeToReasonString(FLOW_ADD_FAILED))
+	}
+	log.Debugw("Flow added to device successfully ", log.Fields{"flow": flow})
+
+	return nil
+}
+
+func (tt TtWorkFlow) ProvisionScheds(subs *Subscriber) error {
+	var trafficSched []*tp_pb.TrafficScheduler
+
+	log.Info("provisioning-scheds")
+
+	if trafficSched = getTrafficSched(subs, tp_pb.Direction_DOWNSTREAM); trafficSched == nil {
+		log.Error("ds-traffic-sched-is-nil")
+		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
+	}
+
+	log.Debugw("Sending Traffic scheduler create to device",
+		log.Fields{"Direction": tp_pb.Direction_DOWNSTREAM, "TrafficScheds": trafficSched})
+	if _, err := subs.OpenOltClient.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
+		IntfId: subs.PonIntf, OnuId: subs.OnuID,
+		UniId: subs.UniID, PortNo: subs.UniPortNo,
+		TrafficScheds: trafficSched}); err != nil {
+		log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
+		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
+	}
+
+	if trafficSched = getTrafficSched(subs, tp_pb.Direction_UPSTREAM); trafficSched == nil {
+		log.Error("us-traffic-sched-is-nil")
+		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
+	}
+
+	log.Debugw("Sending Traffic scheduler create to device",
+		log.Fields{"Direction": tp_pb.Direction_UPSTREAM, "TrafficScheds": trafficSched})
+	if _, err := subs.OpenOltClient.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
+		IntfId: subs.PonIntf, OnuId: subs.OnuID,
+		UniId: subs.UniID, PortNo: subs.UniPortNo,
+		TrafficScheds: trafficSched}); err != nil {
+		log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
+		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
+	}
+	return nil
+}
+
+func (tt TtWorkFlow) ProvisionQueues(subs *Subscriber) error {
+	log.Info("provisioning-queues")
+
+	var trafficQueues []*tp_pb.TrafficQueue
+	if trafficQueues = getTrafficQueues(subs, tp_pb.Direction_DOWNSTREAM); trafficQueues == nil {
+		log.Error("Failed to create traffic queues")
+		return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
+	}
+
+	// On receiving the CreateTrafficQueues request, the driver should create corresponding
+	// downstream queues.
+	log.Debugw("Sending Traffic Queues create to device",
+		log.Fields{"Direction": tp_pb.Direction_DOWNSTREAM, "TrafficQueues": trafficQueues})
+	if _, err := subs.OpenOltClient.CreateTrafficQueues(context.Background(),
+		&tp_pb.TrafficQueues{IntfId: subs.PonIntf, OnuId: subs.OnuID,
+			UniId: subs.UniID, PortNo: subs.UniPortNo,
+			TrafficQueues: trafficQueues}); err != nil {
+		log.Errorw("Failed to create traffic queues in device", log.Fields{"error": err})
+		return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
+	}
+
+	if trafficQueues = getTrafficQueues(subs, tp_pb.Direction_UPSTREAM); trafficQueues == nil {
+		log.Error("Failed to create traffic queues")
+		return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
+	}
+
+	// On receiving the CreateTrafficQueues request, the driver should create corresponding
+	// upstream queues.
+	log.Debugw("Sending Traffic Queues create to device",
+		log.Fields{"Direction": tp_pb.Direction_UPSTREAM, "TrafficQueues": trafficQueues})
+	if _, err := subs.OpenOltClient.CreateTrafficQueues(context.Background(),
+		&tp_pb.TrafficQueues{IntfId: subs.PonIntf, OnuId: subs.OnuID,
+			UniId: subs.UniID, PortNo: subs.UniPortNo,
+			TrafficQueues: trafficQueues}); err != nil {
+		log.Errorw("Failed to create traffic queues in device", log.Fields{"error": err})
+		return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
+	}
+
+	return nil
+}
+
+func (tt TtWorkFlow) ProvisionEapFlow(subs *Subscriber) error {
+	log.Info("tt-workflow-does-not-support-eap-yet--nothing-to-do")
+	return nil
+}
+
+func (tt TtWorkFlow) ProvisionDhcpIPV4Flow(subs *Subscriber) error {
+	log.Info("tt-workflow-does-not-require-dhcp-ipv4-yet--nothing-to-do")
+	return nil
+}
+
+func (tt TtWorkFlow) ProvisionDhcpIPV6Flow(subs *Subscriber) error {
+	log.Info("tt-workflow-does-not-require-dhcp-ipv6-support--nothing-to-do")
+	return nil
+}
+
+func (tt TtWorkFlow) ProvisionIgmpFlow(subs *Subscriber) error {
+	log.Info("tt-workflow-does-not-require-igmp-support--nothing-to-do")
+	return nil
+}
+
+func (tt TtWorkFlow) ProvisionHsiaFlow(subs *Subscriber) error {
+	var err error
+	var flowID []uint32
+	var gemPortIDs []uint32
+
+	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
+	for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
+		gemPortIDs = append(gemPortIDs, gem.GemportID)
+	}
+
+	for idx, gemID := range gemPortIDs {
+		pBitMap := subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList[idx].PbitMap
+		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
+			if pbitSet == '1' {
+				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
+				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
+					ponresourcemanager.FLOW_ID, 1); err != nil {
+					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+				} else {
+					var errUs, errDs error
+					if errUs = AddTtFlow(subs, HsiaFlow, Upstream, flowID[0], allocID, gemID, pcp); errUs != nil {
+						log.Errorw("failed to install US HSIA flow",
+							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+					}
+					if errDs = AddTtFlow(subs, HsiaFlow, Downstream, flowID[0], allocID, gemID, pcp); errDs != nil {
+						log.Errorw("failed to install DS HSIA flow",
+							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+					}
+					if errUs != nil && errDs != nil {
+						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
+							ponresourcemanager.FLOW_ID, flowID)
+					}
+					if errUs != nil || errDs != nil {
+						if errUs != nil {
+							return errUs
+						}
+						return errDs
+					}
+				}
+			}
+		}
+	}
+	return nil
+}
+
+func (tt TtWorkFlow) ProvisionVoipFlow(subs *Subscriber) error {
+	var err error
+	var flowID []uint32
+	var gemPortIDs []uint32
+
+	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
+	for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
+		gemPortIDs = append(gemPortIDs, gem.GemportID)
+	}
+
+	for idx, gemID := range gemPortIDs {
+		pBitMap := subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList[idx].PbitMap
+		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
+			if pbitSet == '1' {
+				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
+				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
+					ponresourcemanager.FLOW_ID, 1); err != nil {
+					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+				} else {
+					var errUs, errDs, errDhcp error
+					if errUs = AddTtFlow(subs, VoipFlow, Upstream, flowID[0], allocID, gemID, pcp); errUs != nil {
+						log.Errorw("failed to install US VOIP flow",
+							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+					}
+					if errDs = AddTtFlow(subs, VoipFlow, Downstream, flowID[0], allocID, gemID, pcp); errDs != nil {
+						log.Errorw("failed to install DS VOIP flow",
+							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+					}
+					if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowID[0], allocID, gemID, pcp); errDhcp != nil {
+						log.Errorw("failed to install US VOIP-DHCP flow",
+							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+					}
+					if errUs != nil && errDs != nil && errDhcp != nil {
+						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
+							ponresourcemanager.FLOW_ID, flowID)
+					}
+					if errUs != nil || errDs != nil || errDhcp != nil {
+						if errUs != nil {
+							return errUs
+						}
+						if errDs != nil {
+							return errDs
+						}
+						return errDhcp
+					}
+				}
+			}
+		}
+	}
+	return nil
+}
+
+func (tt TtWorkFlow) ProvisionVodFlow(subs *Subscriber) error {
+	var err error
+	var flowID []uint32
+	var gemPortIDs []uint32
+
+	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
+	for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
+		gemPortIDs = append(gemPortIDs, gem.GemportID)
+	}
+
+	for idx, gemID := range gemPortIDs {
+		pBitMap := subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList[idx].PbitMap
+		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
+			if pbitSet == '1' {
+				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
+				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
+					ponresourcemanager.FLOW_ID, 1); err != nil {
+					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+				} else {
+					var errUs, errDs, errDhcp, errIgmp error
+					if errUs = AddTtFlow(subs, VodFlow, Upstream, flowID[0], allocID, gemID, pcp); errUs != nil {
+						log.Errorw("failed to install US VOIP flow",
+							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+					}
+					if errDs = AddTtFlow(subs, VodFlow, Downstream, flowID[0], allocID, gemID, pcp); errDs != nil {
+						log.Errorw("failed to install DS VOIP flow",
+							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+					}
+					if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowID[0], allocID, gemID, pcp); errDhcp != nil {
+						log.Errorw("failed to install US VOIP-DHCP flow",
+							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+					}
+					if errIgmp = AddTtFlow(subs, IgmpFlow, Upstream, flowID[0], allocID, gemID, pcp); errIgmp != nil {
+						log.Errorw("failed to install US VOIP-IGMP flow",
+							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+					}
+					if errUs != nil && errDs != nil && errDhcp != nil && errIgmp != nil {
+						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
+							ponresourcemanager.FLOW_ID, flowID)
+					}
+					if errUs != nil || errDs != nil || errDhcp != nil || errIgmp != nil {
+						if errUs != nil {
+							return errUs
+						}
+						if errDs != nil {
+							return errDs
+						}
+						if errDhcp != nil {
+							return errDhcp
+						}
+						return errIgmp
+					}
+				}
+			}
+		}
+	}
+	return nil
+}
+
+func (tt TtWorkFlow) ProvisionMgmtFlow(subs *Subscriber) error {
+	var err error
+	var flowID []uint32
+	var gemPortIDs []uint32
+
+	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
+	for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
+		gemPortIDs = append(gemPortIDs, gem.GemportID)
+	}
+
+	for idx, gemID := range gemPortIDs {
+		pBitMap := subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList[idx].PbitMap
+		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
+			if pbitSet == '1' {
+				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
+				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
+					ponresourcemanager.FLOW_ID, 1); err != nil {
+					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+				} else {
+					var errUs, errDs, errDhcp error
+					if errUs = AddTtFlow(subs, MgmtFlow, Upstream, flowID[0], allocID, gemID, pcp); errUs != nil {
+						log.Errorw("failed to install US MGMT flow",
+							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+					}
+					if errDs = AddTtFlow(subs, MgmtFlow, Downstream, flowID[0], allocID, gemID, pcp); errDs != nil {
+						log.Errorw("failed to install DS MGMT flow",
+							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+					}
+					if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowID[0], allocID, gemID, pcp); errDhcp != nil {
+						log.Errorw("failed to install US MGMT-DHCP flow",
+							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+					}
+					if errUs != nil && errDs != nil && errDhcp != nil {
+						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
+							ponresourcemanager.FLOW_ID, flowID)
+					}
+					if errUs != nil || errDs != nil || errDhcp != nil {
+						if errUs != nil {
+							return errUs
+						}
+						if errDs != nil {
+							return errDs
+						}
+						return errDhcp
+					}
+				}
+			}
+		}
+	}
+	return nil
+}
+
+func (tt TtWorkFlow) ProvisionMulticastFlow(subs *Subscriber) error {
+	var grp GroupData
+	var err error
+
+	numOfONUsPerPon := uint32(subs.TestConfig.NumOfOnu / uint(subs.RsrMgr.deviceInfo.GetPonPorts()))
+
+	grp.Subs = *subs
+	grp.Weight = 20
+	grp.Priority = 0
+	grp.OnuID = 6666
+	grp.UniID = 6666
+	grp.AllocID = 0
+	grp.GemPortID = 4069
+	grp.SchedPolicy = tp_pb.SchedulingPolicy_WRR
+
+	log.Debugw("Group data", log.Fields{"OnuID": subs.OnuID, "GroupID": grp.GroupID, "numOfONUsPerPon": numOfONUsPerPon})
+
+	grp.GroupID = subs.OnuID
+
+	if subs.PonIntf == 0 {
+		grp.AddGroup = true
+		grp.AddFlow = true
+	} else {
+		grp.AddFlow = false
+		grp.AddGroup = false
+	}
+
+	if subs.PonIntf == atomic.LoadUint32(lastPonIntf) {
+		_ = atomic.AddUint32(lastPonIntf, 1)
+		grp.AddSched = true
+		grp.AddQueue = true
+	} else {
+		grp.AddSched = false
+		grp.AddQueue = false
+	}
+
+	grp.AddMember = true
+
+	err = AddMulticastQueueFlow(&grp)
+
+	if err != nil {
+		log.Errorw("Failed to add multicast flow", log.Fields{"error": err})
+	}
+
+	return err
+}
diff --git a/core/utils.go b/core/utils.go
index a99096e..b239443 100644
--- a/core/utils.go
+++ b/core/utils.go
@@ -19,8 +19,8 @@
 import (
 	"fmt"
 
-	"github.com/opencord/voltha-lib-go/v2/pkg/log"
-	"github.com/opencord/voltha-protos/v2/go/openolt"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
 )
 
 type DtStagKey struct {
@@ -31,12 +31,14 @@
 var DtStag map[DtStagKey]uint32
 var DtCtag map[uint32]uint32
 var AttCtag map[uint32]uint32
+var TtCtag map[uint32]uint32
 
 func init() {
 	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
 	AttCtag = make(map[uint32]uint32)
 	DtCtag = make(map[uint32]uint32)
 	DtStag = make(map[DtStagKey]uint32)
+	TtCtag = make(map[uint32]uint32)
 }
 
 const (
@@ -97,6 +99,18 @@
 	return DtCtag[ponIntf]
 }
 
+func GetTtCtag(ponIntf uint32) uint32 {
+	var currCtag uint32
+	var ok bool
+	if currCtag, ok = TtCtag[ponIntf]; !ok {
+		// Start with ctag 1
+		TtCtag[ponIntf] = 1
+		return TtCtag[ponIntf]
+	}
+	TtCtag[ponIntf] = currCtag + 1
+	return TtCtag[ponIntf]
+}
+
 func GetAttStag(ponIntf uint32) uint32 {
 	// start with stag 2
 	return ponIntf + 2
@@ -115,6 +129,11 @@
 	return DtStag[key]
 }
 
+func GetTtStag(ponIntf uint32) uint32 {
+	// start with stag 2
+	return ponIntf + 2
+}
+
 // TODO: More workflow support to be added here
 func GetCtag(workFlowName string, ponIntf uint32) uint32 {
 	switch workFlowName {
@@ -122,6 +141,8 @@
 		return GetAttCtag(ponIntf)
 	case "DT":
 		return GetDtCtag(ponIntf)
+	case "TT":
+		return GetTtCtag(ponIntf)
 	default:
 		log.Errorw("unknown-workflowname", log.Fields{"workflow": workFlowName})
 	}
@@ -134,6 +155,8 @@
 		return GetAttStag(ponIntf)
 	case "DT":
 		return GetDtStag(ponIntf, onuID, uniID)
+	case "TT":
+		return GetTtStag(ponIntf)
 	default:
 		log.Errorw("unknown-workflowname", log.Fields{"workflow": workFlowName})
 	}
diff --git a/core/workflow_manager.go b/core/workflow_manager.go
index 8ab8551..1112a7e 100644
--- a/core/workflow_manager.go
+++ b/core/workflow_manager.go
@@ -18,9 +18,10 @@
 
 import (
 	"errors"
+
 	"github.com/opencord/openolt-scale-tester/config"
-	"github.com/opencord/voltha-lib-go/v2/pkg/log"
-	oop "github.com/opencord/voltha-protos/v2/go/openolt"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	oop "github.com/opencord/voltha-protos/v3/go/openolt"
 )
 
 func init() {
@@ -35,46 +36,72 @@
 	ProvisionDhcpIPV6Flow(subs *Subscriber) error
 	ProvisionIgmpFlow(subs *Subscriber) error
 	ProvisionHsiaFlow(subs *Subscriber) error
+	ProvisionVoipFlow(subs *Subscriber) error
+	ProvisionVodFlow(subs *Subscriber) error
+	ProvisionMgmtFlow(subs *Subscriber) error
+	ProvisionMulticastFlow(subs *Subscriber) error
 	// TODO: Add new items here as needed.
 }
 
-func DeployWorkflow(subs *Subscriber) {
+func DeployWorkflow(subs *Subscriber, isGroup bool) {
 	var wf = getWorkFlow(subs)
 
-	// TODO: Catch and log errors for below items if needed.
-	if err := wf.ProvisionScheds(subs); err != nil {
-		subs.Reason = err.Error()
-		return
-	}
+	if isGroup {
+		if err := wf.ProvisionMulticastFlow(subs); err != nil {
+			subs.Reason = err.Error()
+			return
+		}
+	} else {
+		// TODO: Catch and log errors for below items if needed.
+		if err := wf.ProvisionScheds(subs); err != nil {
+			subs.Reason = err.Error()
+			return
+		}
 
-	if err := wf.ProvisionQueues(subs); err != nil {
-		subs.Reason = err.Error()
-		return
-	}
+		if err := wf.ProvisionQueues(subs); err != nil {
+			subs.Reason = err.Error()
+			return
+		}
 
-	if err := wf.ProvisionEapFlow(subs); err != nil {
-		subs.Reason = err.Error()
-		return
-	}
+		if err := wf.ProvisionEapFlow(subs); err != nil {
+			subs.Reason = err.Error()
+			return
+		}
 
-	if err := wf.ProvisionDhcpIPV4Flow(subs); err != nil {
-		subs.Reason = err.Error()
-		return
-	}
+		if err := wf.ProvisionDhcpIPV4Flow(subs); err != nil {
+			subs.Reason = err.Error()
+			return
+		}
 
-	if err := wf.ProvisionDhcpIPV6Flow(subs); err != nil {
-		subs.Reason = err.Error()
-		return
-	}
+		if err := wf.ProvisionDhcpIPV6Flow(subs); err != nil {
+			subs.Reason = err.Error()
+			return
+		}
 
-	if err := wf.ProvisionIgmpFlow(subs); err != nil {
-		subs.Reason = err.Error()
-		return
-	}
+		if err := wf.ProvisionIgmpFlow(subs); err != nil {
+			subs.Reason = err.Error()
+			return
+		}
 
-	if err := wf.ProvisionHsiaFlow(subs); err != nil {
-		subs.Reason = err.Error()
-		return
+		if err := wf.ProvisionHsiaFlow(subs); err != nil {
+			subs.Reason = err.Error()
+			return
+		}
+
+		if err := wf.ProvisionVoipFlow(subs); err != nil {
+			subs.Reason = err.Error()
+			return
+		}
+
+		if err := wf.ProvisionVodFlow(subs); err != nil {
+			subs.Reason = err.Error()
+			return
+		}
+
+		if err := wf.ProvisionMgmtFlow(subs); err != nil {
+			subs.Reason = err.Error()
+			return
+		}
 	}
 
 	subs.Reason = ReasonCodeToReasonString(SUBSCRIBER_PROVISION_SUCCESS)
@@ -88,6 +115,9 @@
 	case "DT":
 		log.Info("chosen-dt-workflow")
 		return DtWorkFlow{}
+	case "TT":
+		log.Info("chosen-tt-workflow")
+		return TtWorkFlow{}
 	// TODO: Add new workflow here
 	default:
 		log.Errorw("operator-workflow-not-supported-yet", log.Fields{"workflowName": subs.TestConfig.WorkflowName})
@@ -109,6 +139,11 @@
 			log.Error("error-installing-flow", log.Fields{"err": err})
 			return err
 		}
+	case "TT":
+		if err := ProvisionTtNniTrapFlow(oo, config, rsrMgr); err != nil {
+			log.Error("error-installing-flow", log.Fields{"err": err})
+			return err
+		}
 	// TODO: Add new items here
 	default:
 		log.Errorw("operator-workflow-not-supported-yet", log.Fields{"workflowName": config.WorkflowName})
diff --git a/core/workflow_utils.go b/core/workflow_utils.go
index 4376760..4ab7356 100644
--- a/core/workflow_utils.go
+++ b/core/workflow_utils.go
@@ -18,12 +18,13 @@
 
 import (
 	"errors"
+	"math/rand"
 
 	"github.com/opencord/openolt-scale-tester/config"
-	"github.com/opencord/voltha-lib-go/v2/pkg/log"
-	"github.com/opencord/voltha-lib-go/v2/pkg/ponresourcemanager"
-	oop "github.com/opencord/voltha-protos/v2/go/openolt"
-	tp_pb "github.com/opencord/voltha-protos/v2/go/tech_profile"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
+	oop "github.com/opencord/voltha-protos/v3/go/openolt"
+	tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -60,20 +61,57 @@
 	Untagged      = "untagged"
 	SingleTag     = "single_tag"
 	DoubleTag     = "double_tag"
+
+	VoipFlow = "VOIP_FLOW"
+
+	VodFlow = "VOD_FLOW"
+
+	MgmtFlow = "MGMT_FLOW"
+
+	IgmpProto = 2
+	IgmpFlow  = "IGMP_FLOW"
 )
 
+const (
+	MacSize = 6
+	MacMin  = 0x0
+	MacMax  = 0xFF
+)
+
+type GroupData struct {
+	Subs        Subscriber             `json:"subscriber"`
+	GroupID     uint32                 `json:"groupID"`
+	Weight      uint32                 `json:"weight"`
+	Priority    uint32                 `json:"priority"`
+	OnuID       uint32                 `json:"onuID"`
+	UniID       uint32                 `json:"uniID"`
+	AllocID     uint32                 `json:"allocId"`
+	GemPortID   uint32                 `json:"gemPortIds"`
+	SchedPolicy tp_pb.SchedulingPolicy `json:"schedPolicy"`
+	AddGroup    bool                   `json:"addGroup"`
+	AddFlow     bool                   `json:"addFlow"`
+	AddSched    bool                   `json:"addSched"`
+	AddQueue    bool                   `json:"addQueue"`
+	AddMember   bool                   `json:"addMember"`
+}
+
 func getTrafficSched(subs *Subscriber, direction tp_pb.Direction) []*tp_pb.TrafficScheduler {
 	var SchedCfg *tp_pb.SchedulerConfig
+	var err error
 
 	if direction == tp_pb.Direction_DOWNSTREAM {
-		SchedCfg = subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
+		SchedCfg, err = subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
 			GetDsScheduler(subs.TpInstance[subs.TestConfig.TpIDList[0]])
-
 	} else {
-		SchedCfg = subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
+		SchedCfg, err = subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
 			GetUsScheduler(subs.TpInstance[subs.TestConfig.TpIDList[0]])
 	}
 
+	if err != nil {
+		log.Errorw("Failed to create traffic schedulers", log.Fields{"direction": direction, "error": err})
+		return nil
+	}
+
 	// hard-code for now
 	cir := 16000
 	cbs := 5000
@@ -92,10 +130,15 @@
 
 func getTrafficQueues(subs *Subscriber, direction tp_pb.Direction) []*tp_pb.TrafficQueue {
 
-	trafficQueues := subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
+	trafficQueues, err := subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
 		GetTrafficQueues(subs.TpInstance[subs.TestConfig.TpIDList[0]], direction)
 
-	return trafficQueues
+	if err == nil {
+		return trafficQueues
+	}
+
+	log.Errorw("Failed to create traffic queues", log.Fields{"direction": direction, "error": err})
+	return nil
 }
 
 func FormatClassfierAction(flowType string, direction string, subs *Subscriber) (oop.Classifier, oop.Action) {
@@ -202,7 +245,7 @@
 	var flowID []uint32
 	var err error
 
-	if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(uint32(config.NniIntfID),
+	if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(context.Background(), uint32(config.NniIntfID),
 		ponresourcemanager.FLOW_ID, 1); err != nil {
 		return err
 	}
@@ -226,7 +269,7 @@
 
 	if err != nil {
 		log.Errorw("Failed to Add LLDP flow to device", log.Fields{"err": err, "deviceFlow": flow})
-		rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(uint32(config.NniIntfID),
+		rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(context.Background(), uint32(config.NniIntfID),
 			ponresourcemanager.FLOW_ID, flowID)
 		return err
 	}
@@ -234,3 +277,263 @@
 
 	return nil
 }
+
+func GenerateMac(isRand bool) []byte {
+	var mac []byte
+
+	if isRand {
+		for i := 0; i < MacSize; i++ {
+			mac = append(mac, byte(rand.Intn(MacMax-MacMin)+MacMin))
+		}
+	} else {
+		mac = []byte{0x12, 0xAB, 0x34, 0xCD, 0x56, 0xEF}
+	}
+
+	return mac
+}
+
+func GenerateMulticastMac(onu_id uint32, group_id uint32) []byte {
+	var mac []byte
+
+	mac = []byte{0x01, 0x00, 0x5E}
+
+	mac = append(mac, byte(onu_id%255))
+	mac = append(mac, byte(rand.Intn(MacMax-MacMin)+MacMin))
+	mac = append(mac, byte(group_id))
+
+	return mac
+}
+
+func PerformGroupOperation(grp *GroupData, groupCfg *oop.Group) (*oop.Empty, error) {
+	oo := grp.Subs.OpenOltClient
+
+	var err error
+	var res *oop.Empty
+
+	if res, err = oop.OpenoltClient.PerformGroupOperation(oo, context.Background(), groupCfg); err != nil {
+		log.Errorw("Failed to perform - PerformGroupOperation()", log.Fields{"err": err})
+		return nil, err
+	}
+
+	log.Info("Successfully called - PerformGroupOperation()")
+
+	return res, nil
+}
+
+func CreateGroup(grp *GroupData) (*oop.Empty, error) {
+	var groupCfg oop.Group
+
+	log.Infow("creating group", log.Fields{"GroupID": grp.GroupID})
+
+	groupCfg.Command = oop.Group_SET_MEMBERS
+	groupCfg.GroupId = grp.GroupID
+
+	return PerformGroupOperation(grp, &groupCfg)
+}
+
+func OpMulticastTrafficQueue(grp *GroupData, isCreating bool) (*oop.Empty, error) {
+	log.Infow("operating on multicast traffic queue", log.Fields{"Creating": isCreating, "GroupID": grp.GroupID})
+
+	oo := grp.Subs.OpenOltClient
+
+	var request tp_pb.TrafficQueues
+	request.IntfId = grp.Subs.PonIntf
+	request.OnuId = grp.Subs.OnuID
+	request.UniId = grp.Subs.UniID
+
+	var trafficQueues []*tp_pb.TrafficQueue
+
+	var trafficQueue tp_pb.TrafficQueue
+	trafficQueue.Direction = tp_pb.Direction_DOWNSTREAM
+	trafficQueue.Priority = grp.Priority
+	trafficQueue.Weight = grp.Weight
+	trafficQueue.GemportId = grp.GemPortID
+	trafficQueue.SchedPolicy = grp.SchedPolicy
+
+	trafficQueues = append(trafficQueues, &trafficQueue)
+
+	request.TrafficQueues = trafficQueues
+
+	var err error
+	var res *oop.Empty
+
+	if isCreating {
+		if res, err = oop.OpenoltClient.CreateTrafficQueues(oo, context.Background(), &request); err != nil {
+			log.Errorw("Failed to perform - CreateTrafficQueues()", log.Fields{"err": err})
+			return nil, err
+		}
+
+		log.Info("Successfully called - CreateTrafficQueues()")
+	} else {
+		if res, err = oop.OpenoltClient.RemoveTrafficQueues(oo, context.Background(), &request); err != nil {
+			log.Errorw("Failed to perform - RemoveTrafficQueues()", log.Fields{"err": err})
+			return nil, err
+		}
+
+		log.Info("Successfully called - RemoveTrafficQueues()")
+	}
+
+	return res, nil
+}
+
+func AddMulticastFlow(grp *GroupData) error {
+	log.Infow("add multicast flow", log.Fields{"GroupID": grp.GroupID})
+
+	oo := grp.Subs.OpenOltClient
+	config := grp.Subs.TestConfig
+	rsrMgr := grp.Subs.RsrMgr
+
+	var flowID []uint32
+	var err error
+
+	if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(context.Background(), uint32(config.NniIntfID),
+		ponresourcemanager.FLOW_ID, 1); err != nil {
+		return err
+	}
+
+	flowClassifier := &oop.Classifier{
+		IPbits:     255,
+		OPbits:     255,
+		IVid:       55,
+		OVid:       255,
+		DstMac:     GenerateMulticastMac(grp.Subs.OnuID, grp.GroupID),
+		PktTagType: DoubleTag}
+
+	flow := oop.Flow{AccessIntfId: int32(grp.Subs.PonIntf), OnuId: int32(grp.Subs.OnuID), UniId: int32(grp.Subs.UniID), FlowId: flowID[0],
+		FlowType: "multicast", AllocId: int32(grp.AllocID), GemportId: int32(grp.GemPortID),
+		Classifier: flowClassifier, Priority: int32(grp.Priority), PortNo: uint32(grp.Subs.UniPortNo), GroupId: uint32(grp.GroupID)}
+
+	_, err = oo.FlowAdd(context.Background(), &flow)
+
+	st, _ := status.FromError(err)
+	if st.Code() == codes.AlreadyExists {
+		log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+		return nil
+	}
+
+	if err != nil {
+		log.Errorw("Failed to add multicast flow to device", log.Fields{"err": err, "deviceFlow": flow})
+		rsrMgr.ResourceMgrs[uint32(grp.Subs.PonIntf)].FreeResourceID(context.Background(), uint32(config.NniIntfID),
+			ponresourcemanager.FLOW_ID, flowID)
+		return err
+	}
+
+	log.Debugw("Multicast flow added to device successfully ", log.Fields{"flow": flow})
+
+	return nil
+}
+
+func AddMulticastSched(grp *GroupData) error {
+	log.Infow("creating multicast sched", log.Fields{"GroupID": grp.GroupID})
+
+	SchedCfg := &tp_pb.SchedulerConfig{
+		Direction:    tp_pb.Direction_DOWNSTREAM,
+		AdditionalBw: tp_pb.AdditionalBW_AdditionalBW_BestEffort,
+		Priority:     grp.Priority,
+		Weight:       grp.Weight,
+		SchedPolicy:  tp_pb.SchedulingPolicy_WRR}
+
+	// hard-code for now
+	cir := 1948
+	cbs := 31768
+	eir := 100
+	ebs := 1000
+	pir := cir + eir
+	pbs := cbs + ebs
+
+	TfShInfo := &tp_pb.TrafficShapingInfo{Cir: uint32(cir), Cbs: uint32(cbs), Pir: uint32(pir), Pbs: uint32(pbs)}
+
+	TrafficSched := []*tp_pb.TrafficScheduler{grp.Subs.RsrMgr.ResourceMgrs[grp.Subs.PonIntf].TechProfileMgr.
+		GetTrafficScheduler(grp.Subs.TpInstance[grp.Subs.TestConfig.TpIDList[0]], SchedCfg, TfShInfo)}
+
+	if TrafficSched == nil {
+		log.Error("Create scheduler for multicast traffic failed")
+		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
+	}
+
+	log.Debugw("Sending Traffic scheduler create to device",
+		log.Fields{"Direction": tp_pb.Direction_DOWNSTREAM, "TrafficScheds": TrafficSched})
+
+	if _, err := grp.Subs.OpenOltClient.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
+		IntfId: grp.Subs.PonIntf, OnuId: grp.Subs.OnuID,
+		UniId: grp.Subs.UniID, PortNo: grp.Subs.UniPortNo,
+		TrafficScheds: TrafficSched}); err != nil {
+		log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
+		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
+	}
+
+	return nil
+}
+
+func OpMemberToGroup(grp *GroupData, isAdding bool) (*oop.Empty, error) {
+	log.Infow("operating on group", log.Fields{"Adding": isAdding})
+
+	var groupCfg oop.Group
+
+	if isAdding {
+		groupCfg.Command = oop.Group_ADD_MEMBERS
+	} else {
+		groupCfg.Command = oop.Group_REMOVE_MEMBERS
+	}
+
+	groupCfg.GroupId = grp.GroupID
+
+	var members []*oop.GroupMember
+
+	var member0 oop.GroupMember
+	member0.InterfaceId = grp.Subs.PonIntf
+	member0.GemPortId = grp.GemPortID
+	member0.Priority = grp.Priority
+	//member0.SchedPolicy = tp_pb.SchedulingPolicy_WRR
+	member0.InterfaceType = oop.GroupMember_PON
+
+	members = append(members, &member0)
+
+	groupCfg.Members = members
+
+	return PerformGroupOperation(grp, &groupCfg)
+}
+
+func AddMulticastQueueFlow(grp *GroupData) error {
+	var err error
+
+	log.Debugw("Create multicast queue flow", log.Fields{"GroupID": grp.GroupID, "AddGroup": grp.AddGroup,
+		"AddFlow": grp.AddFlow, "AddSched": grp.AddSched, "AddQueue": grp.AddQueue, "AddMember": grp.AddMember})
+
+	if grp.AddGroup {
+		if _, err = CreateGroup(grp); err != nil {
+			log.Error("Failed to add group to device")
+			return err
+		}
+	}
+
+	if grp.AddFlow {
+		if err = AddMulticastFlow(grp); err != nil {
+			log.Error("Failed to add multicast flow to device")
+			return err
+		}
+	}
+
+	if grp.AddSched {
+		if err = AddMulticastSched(grp); err != nil {
+			log.Error("Failed to add multicast sched to device")
+			return err
+		}
+	}
+
+	if grp.AddQueue {
+		if _, err = OpMulticastTrafficQueue(grp, true); err != nil {
+			log.Error("Failed to add multicast queue to device")
+			return err
+		}
+	}
+
+	if grp.AddMember {
+		if _, err = OpMemberToGroup(grp, true); err != nil {
+			log.Error("Failed to add member to group")
+			return err
+		}
+	}
+
+	return nil
+}