VOL-3419: Replicate voltha flows in openolt agent
- The scale-tester-app will adhere to new openolt.proto interface (version 4.0.2)
and will pass necessary information for openolt-agent to replicate the flows.
- upgrade to voltha-lib-go version 4.0.0

Change-Id: I9d862929ae8ac4468d4e93096f8cd8e16f26ec93
diff --git a/core/att_workflow.go b/core/att_workflow.go
index f673ee0..d98cbeb 100644
--- a/core/att_workflow.go
+++ b/core/att_workflow.go
@@ -21,24 +21,20 @@
 	"strings"
 
 	"github.com/opencord/openolt-scale-tester/config"
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	oop "github.com/opencord/voltha-protos/v3/go/openolt"
-	tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
+	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	oop "github.com/opencord/voltha-protos/v4/go/openolt"
+	tp_pb "github.com/opencord/voltha-protos/v4/go/tech_profile"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
 
-func init() {
-	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
-}
-
 // A dummy struct to comply with the WorkFlow interface.
 type AttWorkFlow struct {
 }
 
 func AddDhcpIPV4Flow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig, rsrMgr *OpenOltResourceMgr) error {
-	var flowID uint32
+	var flowID uint64
 	var err error
 
 	if flowID, err = rsrMgr.GetFlowID(context.Background(), uint32(config.NniIntfID)); err != nil {
@@ -59,21 +55,21 @@
 
 	st, _ := status.FromError(err)
 	if st.Code() == codes.AlreadyExists {
-		log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+		logger.Debugw(nil, "Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
 		return nil
 	}
 
 	if err != nil {
-		log.Errorw("Failed to Add DHCP IPv4 to device", log.Fields{"err": err, "deviceFlow": flow})
+		logger.Errorw(nil, "Failed to Add DHCP IPv4 to device", log.Fields{"err": err, "deviceFlow": flow})
 		return err
 	}
-	log.Debugw("DHCP IPV4 added to device successfully ", log.Fields{"flow": flow})
+	logger.Debugw(nil, "DHCP IPV4 added to device successfully ", log.Fields{"flow": flow})
 
 	return nil
 }
 
 func AddDhcpIPV6Flow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig, rsrMgr *OpenOltResourceMgr) error {
-	var flowID uint32
+	var flowID uint64
 	var err error
 
 	if flowID, err = rsrMgr.GetFlowID(context.Background(), uint32(config.NniIntfID)); err != nil {
@@ -94,15 +90,15 @@
 
 	st, _ := status.FromError(err)
 	if st.Code() == codes.AlreadyExists {
-		log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+		logger.Debugw(nil, "Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
 		return nil
 	}
 
 	if err != nil {
-		log.Errorw("Failed to Add DHCP IPV6 to device", log.Fields{"err": err, "deviceFlow": flow})
+		logger.Errorw(nil, "Failed to Add DHCP IPV6 to device", log.Fields{"err": err, "deviceFlow": flow})
 		return err
 	}
-	log.Debugw("DHCP IPV6 added to device successfully ", log.Fields{"flow": flow})
+	logger.Debugw(nil, "DHCP IPV6 added to device successfully ", log.Fields{"flow": flow})
 
 	return nil
 }
@@ -118,75 +114,75 @@
 func (att AttWorkFlow) ProvisionScheds(subs *Subscriber) error {
 	var trafficSched []*tp_pb.TrafficScheduler
 
-	log.Info("provisioning-scheds")
+	logger.Info(nil, "provisioning-scheds")
 
 	if trafficSched = getTrafficSched(subs, tp_pb.Direction_DOWNSTREAM); trafficSched == nil {
-		log.Error("ds-traffic-sched-is-nil")
+		logger.Error(nil, "ds-traffic-sched-is-nil")
 		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
 	}
 
-	log.Debugw("Sending Traffic scheduler create to device",
+	logger.Debugw(nil, "Sending Traffic scheduler create to device",
 		log.Fields{"Direction": tp_pb.Direction_DOWNSTREAM, "TrafficScheds": trafficSched})
 	if _, err := subs.OpenOltClient.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
 		IntfId: subs.PonIntf, OnuId: subs.OnuID,
 		UniId: subs.UniID, PortNo: subs.UniPortNo,
 		TrafficScheds: trafficSched}); err != nil {
-		log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
+		logger.Errorw(nil, "Failed to create traffic schedulers", log.Fields{"error": err})
 		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
 	}
 
 	if trafficSched = getTrafficSched(subs, tp_pb.Direction_UPSTREAM); trafficSched == nil {
-		log.Error("us-traffic-sched-is-nil")
+		logger.Error(nil, "us-traffic-sched-is-nil")
 		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
 	}
 
-	log.Debugw("Sending Traffic scheduler create to device",
+	logger.Debugw(nil, "Sending Traffic scheduler create to device",
 		log.Fields{"Direction": tp_pb.Direction_UPSTREAM, "TrafficScheds": trafficSched})
 	if _, err := subs.OpenOltClient.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
 		IntfId: subs.PonIntf, OnuId: subs.OnuID,
 		UniId: subs.UniID, PortNo: subs.UniPortNo,
 		TrafficScheds: trafficSched}); err != nil {
-		log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
+		logger.Errorw(nil, "Failed to create traffic schedulers", log.Fields{"error": err})
 		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
 	}
 	return nil
 }
 
 func (att AttWorkFlow) ProvisionQueues(subs *Subscriber) error {
-	log.Info("provisioning-queues")
+	logger.Info(nil, "provisioning-queues")
 
 	var trafficQueues []*tp_pb.TrafficQueue
 	if trafficQueues = getTrafficQueues(subs, tp_pb.Direction_DOWNSTREAM); trafficQueues == nil {
-		log.Error("Failed to create traffic queues")
+		logger.Error(nil, "Failed to create traffic queues")
 		return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
 	}
 
 	// On receiving the CreateTrafficQueues request, the driver should create corresponding
 	// downstream queues.
-	log.Debugw("Sending Traffic Queues create to device",
+	logger.Debugw(nil, "Sending Traffic Queues create to device",
 		log.Fields{"Direction": tp_pb.Direction_DOWNSTREAM, "TrafficQueues": trafficQueues})
 	if _, err := subs.OpenOltClient.CreateTrafficQueues(context.Background(),
 		&tp_pb.TrafficQueues{IntfId: subs.PonIntf, OnuId: subs.OnuID,
 			UniId: subs.UniID, PortNo: subs.UniPortNo,
 			TrafficQueues: trafficQueues}); err != nil {
-		log.Errorw("Failed to create traffic queues in device", log.Fields{"error": err})
+		logger.Errorw(nil, "Failed to create traffic queues in device", log.Fields{"error": err})
 		return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
 	}
 
 	if trafficQueues = getTrafficQueues(subs, tp_pb.Direction_UPSTREAM); trafficQueues == nil {
-		log.Error("Failed to create traffic queues")
+		logger.Error(nil, "Failed to create traffic queues")
 		return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
 	}
 
 	// On receiving the CreateTrafficQueues request, the driver should create corresponding
 	// upstream queues.
-	log.Debugw("Sending Traffic Queues create to device",
+	logger.Debugw(nil, "Sending Traffic Queues create to device",
 		log.Fields{"Direction": tp_pb.Direction_UPSTREAM, "TrafficQueues": trafficQueues})
 	if _, err := subs.OpenOltClient.CreateTrafficQueues(context.Background(),
 		&tp_pb.TrafficQueues{IntfId: subs.PonIntf, OnuId: subs.OnuID,
 			UniId: subs.UniID, PortNo: subs.UniPortNo,
 			TrafficQueues: trafficQueues}); err != nil {
-		log.Errorw("Failed to create traffic queues in device", log.Fields{"error": err})
+		logger.Errorw(nil, "Failed to create traffic queues in device", log.Fields{"error": err})
 		return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
 	}
 
@@ -195,8 +191,9 @@
 
 func (att AttWorkFlow) ProvisionEapFlow(subs *Subscriber) error {
 	var err error
-	var flowID uint32
+	var flowID uint64
 	var gemPortIDs []uint32
+	pbitToGem := make(map[uint32]uint32)
 
 	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
 	for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
@@ -208,23 +205,27 @@
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
 				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
-					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
-				} else {
-					if err := AddFlow(subs, EapolFlow, Upstream, flowID, allocID, gemID, pcp); err != nil {
-						return err
-					}
-				}
+				pbitToGem[pcp] = gemID
 			}
 		}
 	}
+	// This flowID is not the BAL flow ID now, it is the voltha-flow-id
+	if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
+		return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+	}
+	if err := AddFlow(subs, EapolFlow, Upstream, flowID, allocID, 0, 0xff,
+		true, 0, pbitToGem); err != nil {
+		return err
+	}
+
 	return nil
 }
 
 func (att AttWorkFlow) ProvisionDhcpIPV4Flow(subs *Subscriber) error {
 	var err error
-	var flowID uint32
+	var flowID uint64
 	var gemPortIDs []uint32
+	pbitToGem := make(map[uint32]uint32)
 
 	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
 	for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
@@ -236,23 +237,27 @@
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
 				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
-					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
-				} else {
-					if err := AddFlow(subs, DhcpFlowIPV4, Upstream, flowID, allocID, gemID, pcp); err != nil {
-						return err
-					}
-				}
+				pbitToGem[pcp] = gemID
 			}
 		}
 	}
+
+	// This flowID is not the BAL flow ID now, it is the voltha-flow-id
+	if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
+		return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+	}
+	if err := AddFlow(subs, DhcpFlowIPV4, Upstream, flowID, allocID, 0, 0xff,
+		true, 0, pbitToGem); err != nil {
+		return err
+	}
 	return nil
 }
 
 func (att AttWorkFlow) ProvisionDhcpIPV6Flow(subs *Subscriber) error {
 	var err error
-	var flowID uint32
+	var flowID uint64
 	var gemPortIDs []uint32
+	pbitToGem := make(map[uint32]uint32)
 
 	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
 	for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
@@ -264,28 +269,32 @@
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
 				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
-					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
-				} else {
-					if err := AddFlow(subs, DhcpFlowIPV6, Upstream, flowID, allocID, gemID, pcp); err != nil {
-						return err
-					}
-				}
+				pbitToGem[pcp] = gemID
 			}
 		}
 	}
+
+	// This flowID is not the BAL flow ID now, it is the voltha-flow-id
+	if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
+		return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+	}
+	if err := AddFlow(subs, DhcpFlowIPV6, Upstream, flowID, allocID, 0, 0xff,
+		true, 0, pbitToGem); err != nil {
+		return err
+	}
 	return nil
 }
 
 func (att AttWorkFlow) ProvisionIgmpFlow(subs *Subscriber) error {
-	log.Info("att-workflow-does-not-support-igmp-yet--nothing-to-do")
+	logger.Info(nil, "att-workflow-does-not-support-igmp-yet--nothing-to-do")
 	return nil
 }
 
 func (att AttWorkFlow) ProvisionHsiaFlow(subs *Subscriber) error {
 	var err error
-	var flowID uint32
+	var flowIDUs, flowIDDs uint64
 	var gemPortIDs []uint32
+	pbitToGem := make(map[uint32]uint32)
 
 	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
 	for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
@@ -297,49 +306,45 @@
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
 				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
-					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
-				} else {
-					var errUs, errDs error
-					if errUs = AddFlow(subs, HsiaFlow, Upstream, flowID, allocID, gemID, pcp); errUs != nil {
-						log.Errorw("failed to install US HSIA flow",
-							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
-					}
-					if errDs = AddFlow(subs, HsiaFlow, Downstream, flowID, allocID, gemID, pcp); errDs != nil {
-						log.Errorw("failed to install US HSIA flow",
-							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
-					}
-					if errUs != nil && errDs != nil {
-					}
-					if errUs != nil || errDs != nil {
-						if errUs != nil {
-							return errUs
-						}
-						return errDs
-					}
-				}
+				pbitToGem[pcp] = gemID
 			}
 		}
 	}
+
+	// This flowID is not the BAL flow ID now, it is the voltha-flow-id
+	if flowIDUs, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
+		return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+	}
+	if err := AddFlow(subs, HsiaFlow, Upstream, flowIDUs, allocID, 0, 0xff,
+		true, 0, pbitToGem); err != nil {
+		return err
+	}
+	if flowIDDs, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
+		return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+	}
+	if err := AddFlow(subs, HsiaFlow, Downstream, flowIDDs, allocID, 0, 0xff,
+		true, flowIDUs, pbitToGem); err != nil {
+		return err
+	}
 	return nil
 }
 
 func (att AttWorkFlow) ProvisionVoipFlow(subs *Subscriber) error {
-	log.Info("att-workflow-does-not-support-voip-yet--nothing-to-do")
+	logger.Info(nil, "att-workflow-does-not-support-voip-yet--nothing-to-do")
 	return nil
 }
 
 func (att AttWorkFlow) ProvisionVodFlow(subs *Subscriber) error {
-	log.Info("att-workflow-does-not-support-vod-yet--nothing-to-do")
+	logger.Info(nil, "att-workflow-does-not-support-vod-yet--nothing-to-do")
 	return nil
 }
 
 func (att AttWorkFlow) ProvisionMgmtFlow(subs *Subscriber) error {
-	log.Info("att-workflow-does-not-support-mgmt-yet--nothing-to-do")
+	logger.Info(nil, "att-workflow-does-not-support-mgmt-yet--nothing-to-do")
 	return nil
 }
 
 func (att AttWorkFlow) ProvisionMulticastFlow(subs *Subscriber) error {
-	log.Info("att-workflow-does-not-support-multicast-yet--nothing-to-do")
+	logger.Info(nil, "att-workflow-does-not-support-multicast-yet--nothing-to-do")
 	return nil
 }
diff --git a/core/common.go b/core/common.go
new file mode 100644
index 0000000..b03022e
--- /dev/null
+++ b/core/common.go
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//Package core Common Logger initialization
+package core
+
+import (
+	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+)
+
+var logger log.CLogger
+
+func init() {
+	// Setup this package so that it's log level can be modified at run time
+	var err error
+	logger, err = log.RegisterPackage(log.JSON, log.DebugLevel, log.Fields{})
+	if err != nil {
+		panic(err)
+	}
+}
diff --git a/core/dt_workflow.go b/core/dt_workflow.go
index 5c8b345..97870ee 100644
--- a/core/dt_workflow.go
+++ b/core/dt_workflow.go
@@ -21,16 +21,12 @@
 	"strings"
 
 	"github.com/opencord/openolt-scale-tester/config"
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	oop "github.com/opencord/voltha-protos/v3/go/openolt"
-	tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
+	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	oop "github.com/opencord/voltha-protos/v4/go/openolt"
+	tp_pb "github.com/opencord/voltha-protos/v4/go/tech_profile"
 	"golang.org/x/net/context"
 )
 
-func init() {
-	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
-}
-
 // A dummy struct to comply with the WorkFlow interface.
 type DtWorkFlow struct {
 }
@@ -44,35 +40,35 @@
 func (dt DtWorkFlow) ProvisionScheds(subs *Subscriber) error {
 	var trafficSched []*tp_pb.TrafficScheduler
 
-	log.Info("provisioning-scheds")
+	logger.Info(nil, "provisioning-scheds")
 
 	if trafficSched = getTrafficSched(subs, tp_pb.Direction_DOWNSTREAM); trafficSched == nil {
-		log.Error("ds-traffic-sched-is-nil")
+		logger.Error(nil, "ds-traffic-sched-is-nil")
 		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
 	}
 
-	log.Debugw("Sending Traffic scheduler create to device",
+	logger.Debugw(nil, "Sending Traffic scheduler create to device",
 		log.Fields{"Direction": tp_pb.Direction_DOWNSTREAM, "TrafficScheds": trafficSched})
 	if _, err := subs.OpenOltClient.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
 		IntfId: subs.PonIntf, OnuId: subs.OnuID,
 		UniId: subs.UniID, PortNo: subs.UniPortNo,
 		TrafficScheds: trafficSched}); err != nil {
-		log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
+		logger.Errorw(nil, "Failed to create traffic schedulers", log.Fields{"error": err})
 		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
 	}
 
 	if trafficSched = getTrafficSched(subs, tp_pb.Direction_UPSTREAM); trafficSched == nil {
-		log.Error("us-traffic-sched-is-nil")
+		logger.Error(nil, "us-traffic-sched-is-nil")
 		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
 	}
 
-	log.Debugw("Sending Traffic scheduler create to device",
+	logger.Debugw(nil, "Sending Traffic scheduler create to device",
 		log.Fields{"Direction": tp_pb.Direction_UPSTREAM, "TrafficScheds": trafficSched})
 	if _, err := subs.OpenOltClient.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
 		IntfId: subs.PonIntf, OnuId: subs.OnuID,
 		UniId: subs.UniID, PortNo: subs.UniPortNo,
 		TrafficScheds: trafficSched}); err != nil {
-		log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
+		logger.Errorw(nil, "Failed to create traffic schedulers", log.Fields{"error": err})
 		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
 	}
 
@@ -80,40 +76,40 @@
 }
 
 func (dt DtWorkFlow) ProvisionQueues(subs *Subscriber) error {
-	log.Info("provisioning-queues")
+	logger.Info(nil, "provisioning-queues")
 
 	var trafficQueues []*tp_pb.TrafficQueue
 	if trafficQueues = getTrafficQueues(subs, tp_pb.Direction_DOWNSTREAM); trafficQueues == nil {
-		log.Error("Failed to create traffic queues")
+		logger.Error(nil, "Failed to create traffic queues")
 		return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
 	}
 
 	// On receiving the CreateTrafficQueues request, the driver should create corresponding
 	// downstream queues.
-	log.Debugw("Sending Traffic Queues create to device",
+	logger.Debugw(nil, "Sending Traffic Queues create to device",
 		log.Fields{"Direction": tp_pb.Direction_DOWNSTREAM, "TrafficQueues": trafficQueues})
 	if _, err := subs.OpenOltClient.CreateTrafficQueues(context.Background(),
 		&tp_pb.TrafficQueues{IntfId: subs.PonIntf, OnuId: subs.OnuID,
 			UniId: subs.UniID, PortNo: subs.UniPortNo,
 			TrafficQueues: trafficQueues}); err != nil {
-		log.Errorw("Failed to create traffic queues in device", log.Fields{"error": err})
+		logger.Errorw(nil, "Failed to create traffic queues in device", log.Fields{"error": err})
 		return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
 	}
 
 	if trafficQueues = getTrafficQueues(subs, tp_pb.Direction_UPSTREAM); trafficQueues == nil {
-		log.Error("Failed to create traffic queues")
+		logger.Error(nil, "Failed to create traffic queues")
 		return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
 	}
 
 	// On receiving the CreateTrafficQueues request, the driver should create corresponding
 	// upstream queues.
-	log.Debugw("Sending Traffic Queues create to device",
+	logger.Debugw(nil, "Sending Traffic Queues create to device",
 		log.Fields{"Direction": tp_pb.Direction_UPSTREAM, "TrafficQueues": trafficQueues})
 	if _, err := subs.OpenOltClient.CreateTrafficQueues(context.Background(),
 		&tp_pb.TrafficQueues{IntfId: subs.PonIntf, OnuId: subs.OnuID,
 			UniId: subs.UniID, PortNo: subs.UniPortNo,
 			TrafficQueues: trafficQueues}); err != nil {
-		log.Errorw("Failed to create traffic queues in device", log.Fields{"error": err})
+		logger.Errorw(nil, "Failed to create traffic queues in device", log.Fields{"error": err})
 		return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
 	}
 
@@ -121,29 +117,30 @@
 }
 
 func (dt DtWorkFlow) ProvisionEapFlow(subs *Subscriber) error {
-	log.Info("dt-workflow-does-not-require-eap-support--nothing-to-do")
+	logger.Info(nil, "dt-workflow-does-not-require-eap-support--nothing-to-do")
 	return nil
 }
 
 func (dt DtWorkFlow) ProvisionDhcpIPV4Flow(subs *Subscriber) error {
-	log.Info("dt-workflow-does-not-require-dhcp-ipv4-support--nothing-to-do")
+	logger.Info(nil, "dt-workflow-does-not-require-dhcp-ipv4-support--nothing-to-do")
 	return nil
 }
 
 func (dt DtWorkFlow) ProvisionDhcpIPV6Flow(subs *Subscriber) error {
-	log.Info("dt-workflow-does-not-require-dhcp-ipv6-support--nothing-to-do")
+	logger.Info(nil, "dt-workflow-does-not-require-dhcp-ipv6-support--nothing-to-do")
 	return nil
 }
 
 func (dt DtWorkFlow) ProvisionIgmpFlow(subs *Subscriber) error {
-	log.Info("dt-workflow-does-not-support-igmp-yet--nothing-to-do")
+	logger.Info(nil, "dt-workflow-does-not-support-igmp-yet--nothing-to-do")
 	return nil
 }
 
 func (dt DtWorkFlow) ProvisionHsiaFlow(subs *Subscriber) error {
 	var err error
-	var flowID uint32
+	var flowIDUs, flowIDDs uint64
 	var gemPortIDs []uint32
+	pbitToGem := make(map[uint32]uint32)
 
 	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
 	for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
@@ -155,38 +152,46 @@
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
 				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
-					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
-				} else {
-					if err := AddFlow(subs, HsiaFlow, Upstream, flowID, allocID, gemID, pcp); err != nil {
-						return err
-					}
-					if err := AddFlow(subs, HsiaFlow, Downstream, flowID, allocID, gemID, pcp); err != nil {
-						return err
-					}
-				}
+				pbitToGem[pcp] = gemID
 			}
 		}
 	}
+
+	// This flowID is not the BAL flow ID now, it is the voltha-flow-id
+	if flowIDUs, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
+		return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+	}
+	if err := AddFlow(subs, HsiaFlow, Upstream, flowIDUs, allocID, 0, 0xff,
+		true, 0, pbitToGem); err != nil {
+		return err
+	}
+	if flowIDDs, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
+		return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+	}
+	if err := AddFlow(subs, HsiaFlow, Downstream, flowIDDs, allocID, 0, 0xff,
+		true, flowIDUs, pbitToGem); err != nil {
+		return err
+	}
+
 	return nil
 }
 
 func (dt DtWorkFlow) ProvisionVoipFlow(subs *Subscriber) error {
-	log.Info("dt-workflow-does-not-support-voip-yet--nothing-to-do")
+	logger.Info(nil, "dt-workflow-does-not-support-voip-yet--nothing-to-do")
 	return nil
 }
 
 func (dt DtWorkFlow) ProvisionVodFlow(subs *Subscriber) error {
-	log.Info("dt-workflow-does-not-support-vod-yet--nothing-to-do")
+	logger.Info(nil, "dt-workflow-does-not-support-vod-yet--nothing-to-do")
 	return nil
 }
 
 func (dt DtWorkFlow) ProvisionMgmtFlow(subs *Subscriber) error {
-	log.Info("dt-workflow-does-not-support-mgmt-yet--nothing-to-do")
+	logger.Info(nil, "dt-workflow-does-not-support-mgmt-yet--nothing-to-do")
 	return nil
 }
 
 func (dt DtWorkFlow) ProvisionMulticastFlow(subs *Subscriber) error {
-	log.Info("dt-workflow-does-not-support-multicast-yet--nothing-to-do")
+	logger.Info(nil, "dt-workflow-does-not-support-multicast-yet--nothing-to-do")
 	return nil
 }
diff --git a/core/olt_manager.go b/core/olt_manager.go
index f4e3143..ab171ef 100644
--- a/core/olt_manager.go
+++ b/core/olt_manager.go
@@ -32,10 +32,10 @@
 
 	"github.com/cenkalti/backoff/v3"
 	"github.com/opencord/openolt-scale-tester/config"
-	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	"github.com/opencord/voltha-lib-go/v3/pkg/techprofile"
-	oop "github.com/opencord/voltha-protos/v3/go/openolt"
+	"github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
+	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	"github.com/opencord/voltha-lib-go/v4/pkg/techprofile"
+	oop "github.com/opencord/voltha-protos/v4/go/openolt"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -52,28 +52,24 @@
 }
 
 type OpenOltManager struct {
-	ipPort        string
-	deviceInfo    *oop.DeviceInfo
-	OnuDeviceMap  map[OnuDeviceKey]*OnuDevice `json:"onuDeviceMap"`
-	TechProfile   map[uint32]*techprofile.TechProfileIf
-	clientConn    *grpc.ClientConn
-	openOltClient oop.OpenoltClient
-	testConfig    *config.OpenOltScaleTesterConfig
-	rsrMgr        *OpenOltResourceMgr
-	lockRsrAlloc  sync.RWMutex
+	ipPort             string
+	deviceInfo         *oop.DeviceInfo
+	OnuDeviceMap       map[OnuDeviceKey]*OnuDevice `json:"onuDeviceMap"`
+	TechProfile        map[uint32]*techprofile.TechProfileIf
+	clientConn         *grpc.ClientConn
+	openOltClient      oop.OpenoltClient
+	testConfig         *config.OpenOltScaleTesterConfig
+	rsrMgr             *OpenOltResourceMgr
+	lockRsrAlloc       sync.RWMutex
 	lockOpenOltManager sync.RWMutex
 }
 
-func init() {
-	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
-}
-
 func NewOpenOltManager(ipPort string) *OpenOltManager {
-	log.Infow("initialized openolt manager with ipPort", log.Fields{"ipPort": ipPort})
+	logger.Infow(nil, "initialized openolt manager with ipPort", log.Fields{"ipPort": ipPort})
 	return &OpenOltManager{
-		ipPort:       ipPort,
-		OnuDeviceMap: make(map[OnuDeviceKey]*OnuDevice),
-		lockRsrAlloc: sync.RWMutex{},
+		ipPort:             ipPort,
+		OnuDeviceMap:       make(map[OnuDeviceKey]*OnuDevice),
+		lockRsrAlloc:       sync.RWMutex{},
 		lockOpenOltManager: sync.RWMutex{},
 	}
 }
@@ -83,9 +79,9 @@
 	var err error
 	// Verify that etcd is up before starting the application.
 	etcdIpPort := "http://" + om.testConfig.KVStoreHost + ":" + strconv.Itoa(om.testConfig.KVStorePort)
-	client, err := kvstore.NewEtcdClient(etcdIpPort, 5)
+	client, err := kvstore.NewEtcdClient(context.Background(), etcdIpPort, 5*time.Second, log.FatalLevel)
 	if err != nil || client == nil {
-		log.Fatal("error-initializing-etcd-client")
+		logger.Fatal(nil, "error-initializing-etcd-client")
 		return
 	}
 
@@ -98,40 +94,40 @@
 		jsonFile, err := os.Open(tpFilePath)
 		// if we os.Open returns an error then handle it
 		if err != nil {
-			log.Fatalw("could-not-find-tech-profile", log.Fields{"err": err, "tpFile": tpFilePath})
+			logger.Fatalw(nil, "could-not-find-tech-profile", log.Fields{"err": err, "tpFile": tpFilePath})
 		}
-		log.Debugw("tp-file-opened-successfully", log.Fields{"tpFile": tpFilePath})
+		logger.Debugw(nil, "tp-file-opened-successfully", log.Fields{"tpFile": tpFilePath})
 
 		// read our opened json file as a byte array.
 		if byteValue, err = ioutil.ReadAll(jsonFile); err != nil {
-			log.Fatalw("could-not-read-tp-file", log.Fields{"err": err, "tpFile": tpFilePath})
+			logger.Fatalw(nil, "could-not-read-tp-file", log.Fields{"err": err, "tpFile": tpFilePath})
 		}
 
 		var tp techprofile.TechProfile
 
 		if err = json.Unmarshal(byteValue, &tp); err != nil {
-			log.Fatalw("could-not-unmarshal-tp", log.Fields{"err": err, "tpFile": tpFilePath})
+			logger.Fatalw(nil, "could-not-unmarshal-tp", log.Fields{"err": err, "tpFile": tpFilePath})
 		} else {
-			log.Infow("tp-read-from-file", log.Fields{"tp": tp, "tpFile": tpFilePath})
+			logger.Infow(nil, "tp-read-from-file", log.Fields{"tp": tp, "tpFile": tpFilePath})
 		}
 		kvPath := fmt.Sprintf(TechProfileKVPath, om.deviceInfo.Technology, tpID)
 		tpJson, err := json.Marshal(tp)
 		err = client.Put(context.Background(), kvPath, tpJson)
 		if err != nil {
-			log.Fatalw("tp-put-to-etcd-failed", log.Fields{"tpPath": kvPath, "err": err})
+			logger.Fatalw(nil, "tp-put-to-etcd-failed", log.Fields{"tpPath": kvPath, "err": err})
 		}
 		// verify the PUT succeeded.
 		kvResult, err := client.Get(context.Background(), kvPath)
 		if kvResult == nil {
-			log.Fatal("tp-not-found-on-kv-after-load", log.Fields{"key": kvPath, "err": err})
+			logger.Fatal(nil, "tp-not-found-on-kv-after-load", log.Fields{"key": kvPath, "err": err})
 		} else {
 			var KvTpIns techprofile.TechProfile
 			var resPtr = &KvTpIns
 			if value, err := kvstore.ToByte(kvResult.Value); err == nil {
 				if err = json.Unmarshal(value, resPtr); err != nil {
-					log.Fatal("error-unmarshal-kv-result", log.Fields{"err": err, "key": kvPath, "value": value})
+					logger.Fatal(nil, "error-unmarshal-kv-result", log.Fields{"err": err, "key": kvPath, "value": value})
 				} else {
-					log.Infow("verified-ok-that-tp-load-was-good", log.Fields{"tpID": tpID, "kvPath": kvPath})
+					logger.Infow(nil, "verified-ok-that-tp-load-was-good", log.Fields{"tpID": tpID, "kvPath": kvPath})
 					_ = jsonFile.Close()
 					continue
 				}
@@ -146,31 +142,31 @@
 
 	// Establish gRPC connection with the device
 	if om.clientConn, err = grpc.Dial(om.ipPort, grpc.WithInsecure(), grpc.WithBlock()); err != nil {
-		log.Errorw("Failed to dial device", log.Fields{"ipPort": om.ipPort, "err": err})
+		logger.Errorw(nil, "Failed to dial device", log.Fields{"ipPort": om.ipPort, "err": err})
 		return err
 	}
 	om.openOltClient = oop.NewOpenoltClient(om.clientConn)
 
 	// Populate Device Info
 	if deviceInfo, err := om.populateDeviceInfo(); err != nil {
-		log.Error("error fetching device info", log.Fields{"err": err, "deviceInfo": deviceInfo})
+		logger.Error(nil, "error fetching device info", log.Fields{"err": err, "deviceInfo": deviceInfo})
 		return err
 	}
 
 	// Read and load TPs to etcd.
 	om.readAndLoadTPsToEtcd()
 
-	log.Info("etcd-up-and-running--tp-loaded-successfully")
+	logger.Info(nil, "etcd-up-and-running--tp-loaded-successfully")
 
 	if om.rsrMgr = NewResourceMgr("ABCD", om.testConfig.KVStoreHost+":"+strconv.Itoa(om.testConfig.KVStorePort),
 		"etcd", "openolt", om.deviceInfo); om.rsrMgr == nil {
-		log.Error("Error while instantiating resource manager")
+		logger.Error(nil, "Error while instantiating resource manager")
 		return errors.New("instantiating resource manager failed")
 	}
 
 	om.TechProfile = make(map[uint32]*techprofile.TechProfileIf)
 	if err = om.populateTechProfilePerPonPort(); err != nil {
-		log.Error("Error while populating tech profile mgr\n")
+		logger.Error(nil, "Error while populating tech profile mgr\n")
 		return errors.New("error-loading-tech-profile-per-ponPort")
 	}
 
@@ -179,7 +175,7 @@
 
 	// Provision OLT NNI Trap flows as needed by the Workflow
 	if err = ProvisionNniTrapFlow(om.openOltClient, om.testConfig, om.rsrMgr); err != nil {
-		log.Error("failed-to-add-nni-trap-flow", log.Fields{"err": err})
+		logger.Error(nil, "failed-to-add-nni-trap-flow", log.Fields{"err": err})
 	}
 
 	// Provision ONUs one by one
@@ -193,16 +189,16 @@
 	var err error
 
 	if om.deviceInfo, err = om.openOltClient.GetDeviceInfo(context.Background(), new(oop.Empty)); err != nil {
-		log.Errorw("Failed to fetch device info", log.Fields{"err": err})
+		logger.Errorw(nil, "Failed to fetch device info", log.Fields{"err": err})
 		return nil, err
 	}
 
 	if om.deviceInfo == nil {
-		log.Errorw("Device info is nil", log.Fields{})
+		logger.Errorw(nil, "Device info is nil", log.Fields{})
 		return nil, errors.New("failed to get device info from OLT")
 	}
 
-	log.Debugw("Fetched device info", log.Fields{"deviceInfo": om.deviceInfo})
+	logger.Debugw(nil, "Fetched device info", log.Fields{"deviceInfo": om.deviceInfo})
 
 	return om.deviceInfo, nil
 }
@@ -221,20 +217,20 @@
 	// If the number of ONUs to provision is not a power of 2, stop execution
 	// This is needed for ensure even distribution of ONUs across all PONs
 	if !isPowerOfTwo(om.testConfig.NumOfOnu) {
-		log.Errorw("num-of-onus-to-provision-is-not-a-power-of-2", log.Fields{"numOfOnus": om.testConfig.NumOfOnu})
+		logger.Errorw(nil, "num-of-onus-to-provision-is-not-a-power-of-2", log.Fields{"numOfOnus": om.testConfig.NumOfOnu})
 		return
 	}
 
 	// Number of ONUs to provision should not be less than the number of PON ports.
 	// We need at least one ONU per PON
 	if om.testConfig.NumOfOnu < uint(om.deviceInfo.PonPorts) {
-		log.Errorw("num-of-onu-is-less-than-num-of-pon-port", log.Fields{"numOfOnus":om.testConfig.NumOfOnu, "numOfPon": om.deviceInfo.PonPorts})
+		logger.Errorw(nil, "num-of-onu-is-less-than-num-of-pon-port", log.Fields{"numOfOnus": om.testConfig.NumOfOnu, "numOfPon": om.deviceInfo.PonPorts})
 		return
 	}
 
 	numOfONUsPerPon = om.testConfig.NumOfOnu / uint(om.deviceInfo.PonPorts)
 	totalOnusToProvision := numOfONUsPerPon * uint(om.deviceInfo.PonPorts)
-	log.Infow("***** all-onu-provision-started ******",
+	logger.Infow(nil, "***** all-onu-provision-started ******",
 		log.Fields{"totalNumOnus": totalOnusToProvision,
 			"numOfOnusPerPon": numOfONUsPerPon,
 			"numOfPons":       om.deviceInfo.PonPorts})
@@ -258,12 +254,12 @@
 				om.lockRsrAlloc.Lock()
 				sn := GenerateNextONUSerialNumber()
 				om.lockRsrAlloc.Unlock()
-				log.Debugw("provisioning onu", log.Fields{"onuID": j, "ponPort": i, "serialNum": sn})
+				logger.Debugw(nil, "provisioning onu", log.Fields{"onuID": j, "ponPort": i, "serialNum": sn})
 				if onuID, err = om.rsrMgr.GetONUID(j); err != nil {
-					log.Errorw("error getting onu id", log.Fields{"err": err})
+					logger.Errorw(nil, "error getting onu id", log.Fields{"err": err})
 					continue
 				}
-				log.Infow("onu-provision-started-from-olt-manager", log.Fields{"onuId": onuID, "ponIntf": i})
+				logger.Infow(nil, "onu-provision-started-from-olt-manager", log.Fields{"onuId": onuID, "ponIntf": i})
 
 				onuWg.Add(1)
 				go om.activateONU(j, onuID, sn, om.stringifySerialNumber(sn), &onuWg)
@@ -273,10 +269,10 @@
 		onuWg.Wait()
 	}
 	endTime := time.Now()
-	log.Info("******** all-onu-provisioning-completed *******")
+	logger.Info(nil, "******** all-onu-provisioning-completed *******")
 	totalTime := endTime.Sub(startTime)
 	out := time.Time{}.Add(totalTime)
-	log.Infof("****** Total Time to provision all the ONUs is => %s", out.Format("15:04:05"))
+	logger.Infof(nil, "****** Total Time to provision all the ONUs is => %s", out.Format("15:04:05"))
 
 	// TODO: We need to dump the results at the end. But below json marshall does not work. We will need custom Marshal function.
 	/*
@@ -290,7 +286,7 @@
 }
 
 func (om *OpenOltManager) activateONU(intfID uint32, onuID uint32, serialNum *oop.SerialNumber, serialNumber string, onuWg *sync.WaitGroup) {
-	log.Debugw("activate-onu", log.Fields{"intfID": intfID, "onuID": onuID, "serialNum": serialNum, "serialNumber": serialNumber})
+	logger.Debugw(nil, "activate-onu", log.Fields{"intfID": intfID, "onuID": onuID, "serialNum": serialNum, "serialNumber": serialNumber})
 	// TODO: need resource manager
 	var pir uint32 = 1000000
 	var onuDevice = OnuDevice{
@@ -312,13 +308,13 @@
 	if _, err = om.openOltClient.ActivateOnu(context.Background(), &Onu); err != nil {
 		st, _ := status.FromError(err)
 		if st.Code() == codes.AlreadyExists {
-			log.Debug("ONU activation is in progress", log.Fields{"SerialNumber": serialNumber})
+			logger.Debug(nil, "ONU activation is in progress", log.Fields{"SerialNumber": serialNumber})
 		} else {
 			nanos = now.UnixNano()
 			milliEnd := nanos / 1000000
 			onuDevice.OnuProvisionEndTime = time.Unix(0, nanos)
 			onuDevice.OnuProvisionDurationInMs = milliEnd - milliStart
-			log.Errorw("activate-onu-failed", log.Fields{"Onu": Onu, "err ": err})
+			logger.Errorw(nil, "activate-onu-failed", log.Fields{"Onu": Onu, "err ": err})
 			onuDevice.Reason = err.Error()
 		}
 	} else {
@@ -327,7 +323,7 @@
 		onuDevice.OnuProvisionEndTime = time.Unix(0, nanos)
 		onuDevice.OnuProvisionDurationInMs = milliEnd - milliStart
 		onuDevice.Reason = ReasonOk
-		log.Infow("activated-onu", log.Fields{"SerialNumber": serialNumber})
+		logger.Infow(nil, "activated-onu", log.Fields{"SerialNumber": serialNumber})
 	}
 
 	om.lockOpenOltManager.Lock()
@@ -364,14 +360,14 @@
 
 // readIndications to read the indications from the OLT device
 func (om *OpenOltManager) readIndications() {
-	defer log.Errorw("Indications ended", log.Fields{})
+	defer logger.Errorw(nil, "Indications ended", log.Fields{})
 	indications, err := om.openOltClient.EnableIndication(context.Background(), new(oop.Empty))
 	if err != nil {
-		log.Errorw("Failed to read indications", log.Fields{"err": err})
+		logger.Errorw(nil, "Failed to read indications", log.Fields{"err": err})
 		return
 	}
 	if indications == nil {
-		log.Errorw("Indications is nil", log.Fields{})
+		logger.Errorw(nil, "Indications is nil", log.Fields{})
 		return
 	}
 
@@ -385,26 +381,26 @@
 	for {
 		indication, err := indications.Recv()
 		if err == io.EOF {
-			log.Infow("EOF for  indications", log.Fields{"err": err})
+			logger.Infow(nil, "EOF for  indications", log.Fields{"err": err})
 			// Use an exponential back off to prevent getting into a tight loop
 			duration := indicationBackoff.NextBackOff()
 			if duration == backoff.Stop {
 				// If we reach a maximum then warn and reset the backoff
 				// timer and keep attempting.
-				log.Warnw("Maximum indication backoff reached, resetting backoff timer",
+				logger.Warnw(nil, "Maximum indication backoff reached, resetting backoff timer",
 					log.Fields{"max_indication_backoff": indicationBackoff.MaxElapsedTime})
 				indicationBackoff.Reset()
 			}
 			time.Sleep(indicationBackoff.NextBackOff())
 			indications, err = om.openOltClient.EnableIndication(context.Background(), new(oop.Empty))
 			if err != nil {
-				log.Errorw("Failed to read indications", log.Fields{"err": err})
+				logger.Errorw(nil, "Failed to read indications", log.Fields{"err": err})
 				return
 			}
 			continue
 		}
 		if err != nil {
-			log.Infow("Failed to read from indications", log.Fields{"err": err})
+			logger.Infow(nil, "Failed to read from indications", log.Fields{"err": err})
 			break
 		}
 		// Reset backoff if we have a successful receive
@@ -417,42 +413,42 @@
 func (om *OpenOltManager) handleIndication(indication *oop.Indication) {
 	switch indication.Data.(type) {
 	case *oop.Indication_OltInd:
-		log.Info("received olt indication")
+		logger.Info(nil, "received olt indication")
 	case *oop.Indication_IntfInd:
 		intfInd := indication.GetIntfInd()
-		log.Infow("Received interface indication ", log.Fields{"InterfaceInd": intfInd})
+		logger.Infow(nil, "Received interface indication ", log.Fields{"InterfaceInd": intfInd})
 	case *oop.Indication_IntfOperInd:
 		intfOperInd := indication.GetIntfOperInd()
 		if intfOperInd.GetType() == "nni" {
-			log.Info("received interface oper indication for nni port")
+			logger.Info(nil, "received interface oper indication for nni port")
 		} else if intfOperInd.GetType() == "pon" {
-			log.Info("received interface oper indication for pon port")
+			logger.Info(nil, "received interface oper indication for pon port")
 		}
 		/*
 			case *oop.Indication_OnuDiscInd:
 				onuDiscInd := indication.GetOnuDiscInd()
-				log.Infow("Received Onu discovery indication ", log.Fields{"OnuDiscInd": onuDiscInd})
+				logger.Infow(nil, "Received Onu discovery indication ", log.Fields{"OnuDiscInd": onuDiscInd})
 		*/
 	case *oop.Indication_OnuInd:
 		onuInd := indication.GetOnuInd()
-		log.Infow("Received Onu indication ", log.Fields{"OnuInd": onuInd})
+		logger.Infow(nil, "Received Onu indication ", log.Fields{"OnuInd": onuInd})
 	case *oop.Indication_OmciInd:
 		omciInd := indication.GetOmciInd()
-		log.Debugw("Received Omci indication ", log.Fields{"IntfId": omciInd.IntfId, "OnuId": omciInd.OnuId, "pkt": hex.EncodeToString(omciInd.Pkt)})
+		logger.Debugw(nil, "Received Omci indication ", log.Fields{"IntfId": omciInd.IntfId, "OnuId": omciInd.OnuId, "pkt": hex.EncodeToString(omciInd.Pkt)})
 	case *oop.Indication_PktInd:
 		pktInd := indication.GetPktInd()
-		log.Infow("Received packet indication ", log.Fields{"PktInd": pktInd})
+		logger.Infow(nil, "Received packet indication ", log.Fields{"PktInd": pktInd})
 		/*
 				case *oop.Indication_PortStats:
 				portStats := indication.GetPortStats()
-				log.Infow("Received port stats", log.Fields{"portStats": portStats})
+				logger.Infow(nil, "Received port stats", log.Fields{"portStats": portStats})
 			case *oop.Indication_FlowStats:
 				flowStats := indication.GetFlowStats()
-				log.Infow("Received flow stats", log.Fields{"FlowStats": flowStats})
+				logger.Infow(nil, "Received flow stats", log.Fields{"FlowStats": flowStats})
 		*/
 	case *oop.Indication_AlarmInd:
 		alarmInd := indication.GetAlarmInd()
-		log.Infow("Received alarm indication ", log.Fields{"AlarmInd": alarmInd})
+		logger.Infow(nil, "Received alarm indication ", log.Fields{"AlarmInd": alarmInd})
 	}
 }
 
@@ -462,20 +458,20 @@
 		for _, intfID := range techRange.IntfIds {
 			om.TechProfile[intfID] = &(om.rsrMgr.ResourceMgrs[intfID].TechProfileMgr)
 			tpCount++
-			log.Debugw("Init tech profile done", log.Fields{"intfID": intfID})
+			logger.Debugw(nil, "Init tech profile done", log.Fields{"intfID": intfID})
 		}
 	}
 	//Make sure we have as many tech_profiles as there are pon ports on the device
 	if tpCount != int(om.deviceInfo.GetPonPorts()) {
-		log.Errorw("Error while populating techprofile",
+		logger.Errorw(nil, "Error while populating techprofile",
 			log.Fields{"numofTech": tpCount, "numPonPorts": om.deviceInfo.GetPonPorts()})
 		return errors.New("error while populating techprofile mgrs")
 	}
-	log.Infow("Populated techprofile for ponports successfully",
+	logger.Infow(nil, "Populated techprofile for ponports successfully",
 		log.Fields{"numofTech": tpCount, "numPonPorts": om.deviceInfo.GetPonPorts()})
 	return nil
 }
 
 func isPowerOfTwo(numOfOnus uint) bool {
 	return (numOfOnus & (numOfOnus - 1)) == 0
-}
\ No newline at end of file
+}
diff --git a/core/onu_manager.go b/core/onu_manager.go
index 4fd665b..5b44b28 100644
--- a/core/onu_manager.go
+++ b/core/onu_manager.go
@@ -22,14 +22,10 @@
 	"time"
 
 	"github.com/opencord/openolt-scale-tester/config"
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	oop "github.com/opencord/voltha-protos/v3/go/openolt"
+	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	oop "github.com/opencord/voltha-protos/v4/go/openolt"
 )
 
-func init() {
-	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
-}
-
 type SubscriberKey struct {
 	SubscriberName string
 }
@@ -53,7 +49,7 @@
 	onu.SubscriberMap = make(map[SubscriberKey]*Subscriber)
 	var subs uint
 	var subWg sync.WaitGroup
-	log.Infow("onu-provision-started-from-onu-manager", log.Fields{"onuID": onu.OnuID, "ponIntf": onu.PonIntf})
+	logger.Infow(nil, "onu-provision-started-from-onu-manager", log.Fields{"onuID": onu.OnuID, "ponIntf": onu.PonIntf})
 
 	for subs = 0; subs < onu.testConfig.SubscribersPerOnu; subs++ {
 		subsName := onu.SerialNum + "-" + strconv.Itoa(int(subs))
@@ -74,7 +70,7 @@
 		onu.SubscriberMap[subsKey] = &subs
 
 		subWg.Add(1)
-		log.Infow("subscriber-provision-started-from-onu-manager", log.Fields{"subsName": subsName})
+		logger.Infow(nil, "subscriber-provision-started-from-onu-manager", log.Fields{"subsName": subsName})
 		// Start provisioning the subscriber
 		go subs.Start(onu.testConfig.IsGroupTest)
 
@@ -85,5 +81,5 @@
 	// Signal that ONU provisioning is complete
 	onu.onuWg.Done()
 
-	log.Infow("onu-provision-completed-from-onu-manager", log.Fields{"onuID": onu.OnuID, "ponIntf": onu.PonIntf})
+	logger.Infow(nil, "onu-provision-completed-from-onu-manager", log.Fields{"onuID": onu.OnuID, "ponIntf": onu.PonIntf})
 }
diff --git a/core/resource_manager.go b/core/resource_manager.go
index d8674c9..255e1be 100644
--- a/core/resource_manager.go
+++ b/core/resource_manager.go
@@ -18,20 +18,14 @@
 package core
 
 import (
-	"strconv"
-	"strings"
 	"sync"
 
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	ponrmgr "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
-	"github.com/opencord/voltha-protos/v3/go/openolt"
+	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	ponrmgr "github.com/opencord/voltha-lib-go/v4/pkg/ponresourcemanager"
+	"github.com/opencord/voltha-protos/v4/go/openolt"
 	"golang.org/x/net/context"
 )
 
-func init() {
-	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
-}
-
 // OpenOltResourceMgr holds resource related information as provided below for each field
 type OpenOltResourceMgr struct {
 	deviceInfo *openolt.DeviceInfo
@@ -47,6 +41,8 @@
 
 	// array of pon resource managers per interface technology
 	ResourceMgrs map[uint32]*ponrmgr.PONResourceManager
+
+	flow_id uint64
 }
 
 // NewResourceMgr init a New resource manager instance which in turn instantiates pon resource manager
@@ -54,7 +50,7 @@
 // the resources.
 func NewResourceMgr(deviceID string, KVStoreHostPort string, kvStoreType string, deviceType string, devInfo *openolt.DeviceInfo) *OpenOltResourceMgr {
 	var ResourceMgr OpenOltResourceMgr
-	log.Debugf("Init new resource manager")
+	logger.Debugf(nil, "Init new resource manager")
 
 	ResourceMgr.deviceInfo = devInfo
 	NumPONPorts := devInfo.GetPonPorts()
@@ -118,16 +114,14 @@
 	// each technology is represented by only a single range
 	var GlobalPONRsrcMgr *ponrmgr.PONResourceManager
 	var err error
-	IPPort := strings.Split(KVStoreHostPort, ":")
 	for _, TechRange := range devInfo.Ranges {
 		technology := TechRange.Technology
-		log.Debugf("Device info technology %s", technology)
+		logger.Debugf(nil, "Device info technology %s", technology)
 		Ranges[technology] = TechRange
-		port, _ := strconv.Atoi(IPPort[1])
-		RsrcMgrsByTech[technology], err = ponrmgr.NewPONResourceManager(technology, deviceType, deviceID,
-			kvStoreType, IPPort[0], port)
+		RsrcMgrsByTech[technology], err = ponrmgr.NewPONResourceManager(nil, technology, deviceType, deviceID,
+			kvStoreType, KVStoreHostPort)
 		if err != nil {
-			log.Errorf("Failed to create pon resource manager instance for technology %s", technology)
+			logger.Errorf(nil, "Failed to create pon resource manager instance for technology %s", technology)
 			return nil
 		}
 		// resource_mgrs_by_tech[technology] = resource_mgr
@@ -146,7 +140,7 @@
 	for _, PONRMgr := range RsrcMgrsByTech {
 		_ = PONRMgr.InitDeviceResourcePool(context.Background())
 	}
-	log.Info("Initialization of  resource manager success!")
+	logger.Info(nil, "Initialization of  resource manager success!")
 	return &ResourceMgr
 }
 
@@ -159,11 +153,11 @@
 
 	// init the resource range pool according to the sharing type
 
-	log.Debugf("Resource range pool init for technology %s", ponRMgr.Technology)
+	logger.Debugf(nil, "Resource range pool init for technology %s", ponRMgr.Technology)
 	// first load from KV profiles
 	status := ponRMgr.InitResourceRangesFromKVStore(context.Background())
 	if !status {
-		log.Debugf("Failed to load resource ranges from KV store for tech %s", ponRMgr.Technology)
+		logger.Debugf(nil, "Failed to load resource ranges from KV store for tech %s", ponRMgr.Technology)
 	}
 
 	/*
@@ -171,7 +165,7 @@
 	   or is broader than the device, the device's information will
 	   dictate the range limits
 	*/
-	log.Debugf("Using device info to init pon resource ranges for tech", ponRMgr.Technology)
+	logger.Debugf(nil, "Using device info to init pon resource ranges for tech", ponRMgr.Technology)
 
 	ONUIDStart := devInfo.OnuIdStart
 	ONUIDEnd := devInfo.OnuIdEnd
@@ -236,7 +230,7 @@
 		}
 	}
 
-	log.Debugw("Device info init", log.Fields{"technology": techRange.Technology,
+	logger.Debugw(nil, "Device info init", log.Fields{"technology": techRange.Technology,
 		"onu_id_start": ONUIDStart, "onu_id_end": ONUIDEnd, "onu_id_shared_pool_id": ONUIDSharedPoolID,
 		"alloc_id_start": AllocIDStart, "alloc_id_end": AllocIDEnd,
 		"alloc_id_shared_pool_id": AllocIDSharedPoolID,
@@ -250,7 +244,7 @@
 		"uni_id_end_idx":            1, /*MaxUNIIDperONU()*/
 	})
 
-	ponRMgr.InitDefaultPONResourceRanges(ONUIDStart, ONUIDEnd, ONUIDSharedPoolID,
+	ponRMgr.InitDefaultPONResourceRanges(nil, ONUIDStart, ONUIDEnd, ONUIDSharedPoolID,
 		AllocIDStart, AllocIDEnd, AllocIDSharedPoolID,
 		GEMPortIDStart, GEMPortIDEnd, GEMPortIDSharedPoolID,
 		FlowIDStart, FlowIDEnd, FlowIDSharedPoolID, 0, 1,
@@ -259,44 +253,44 @@
 	// For global sharing, make sure to refresh both local and global resource manager instances' range
 
 	if ONUIDShared == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
-		globalPONRMgr.UpdateRanges(ponrmgr.ONU_ID_START_IDX, ONUIDStart, ponrmgr.ONU_ID_END_IDX, ONUIDEnd,
+		globalPONRMgr.UpdateRanges(nil, ponrmgr.ONU_ID_START_IDX, ONUIDStart, ponrmgr.ONU_ID_END_IDX, ONUIDEnd,
 			"", 0, nil)
-		ponRMgr.UpdateRanges(ponrmgr.ONU_ID_START_IDX, ONUIDStart, ponrmgr.ONU_ID_END_IDX, ONUIDEnd,
+		ponRMgr.UpdateRanges(nil, ponrmgr.ONU_ID_START_IDX, ONUIDStart, ponrmgr.ONU_ID_END_IDX, ONUIDEnd,
 			"", 0, globalPONRMgr)
 	}
 	if AllocIDShared == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
-		globalPONRMgr.UpdateRanges(ponrmgr.ALLOC_ID_START_IDX, AllocIDStart, ponrmgr.ALLOC_ID_END_IDX, AllocIDEnd,
+		globalPONRMgr.UpdateRanges(nil, ponrmgr.ALLOC_ID_START_IDX, AllocIDStart, ponrmgr.ALLOC_ID_END_IDX, AllocIDEnd,
 			"", 0, nil)
 
-		ponRMgr.UpdateRanges(ponrmgr.ALLOC_ID_START_IDX, AllocIDStart, ponrmgr.ALLOC_ID_END_IDX, AllocIDEnd,
+		ponRMgr.UpdateRanges(nil, ponrmgr.ALLOC_ID_START_IDX, AllocIDStart, ponrmgr.ALLOC_ID_END_IDX, AllocIDEnd,
 			"", 0, globalPONRMgr)
 	}
 	if GEMPortIDShared == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
-		globalPONRMgr.UpdateRanges(ponrmgr.GEMPORT_ID_START_IDX, GEMPortIDStart, ponrmgr.GEMPORT_ID_END_IDX, GEMPortIDEnd,
+		globalPONRMgr.UpdateRanges(nil, ponrmgr.GEMPORT_ID_START_IDX, GEMPortIDStart, ponrmgr.GEMPORT_ID_END_IDX, GEMPortIDEnd,
 			"", 0, nil)
-		ponRMgr.UpdateRanges(ponrmgr.GEMPORT_ID_START_IDX, GEMPortIDStart, ponrmgr.GEMPORT_ID_END_IDX, GEMPortIDEnd,
+		ponRMgr.UpdateRanges(nil, ponrmgr.GEMPORT_ID_START_IDX, GEMPortIDStart, ponrmgr.GEMPORT_ID_END_IDX, GEMPortIDEnd,
 			"", 0, globalPONRMgr)
 	}
 	if FlowIDShared == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
-		globalPONRMgr.UpdateRanges(ponrmgr.FLOW_ID_START_IDX, FlowIDStart, ponrmgr.FLOW_ID_END_IDX, FlowIDEnd,
+		globalPONRMgr.UpdateRanges(nil, ponrmgr.FLOW_ID_START_IDX, FlowIDStart, ponrmgr.FLOW_ID_END_IDX, FlowIDEnd,
 			"", 0, nil)
-		ponRMgr.UpdateRanges(ponrmgr.FLOW_ID_START_IDX, FlowIDStart, ponrmgr.FLOW_ID_END_IDX, FlowIDEnd,
+		ponRMgr.UpdateRanges(nil, ponrmgr.FLOW_ID_START_IDX, FlowIDStart, ponrmgr.FLOW_ID_END_IDX, FlowIDEnd,
 			"", 0, globalPONRMgr)
 	}
 
 	// Make sure loaded range fits the platform bit encoding ranges
-	ponRMgr.UpdateRanges(ponrmgr.UNI_ID_START_IDX, 0, ponrmgr.UNI_ID_END_IDX /* TODO =OpenOltPlatform.MAX_UNIS_PER_ONU-1*/, 1, "", 0, nil)
+	ponRMgr.UpdateRanges(nil, ponrmgr.UNI_ID_START_IDX, 0, ponrmgr.UNI_ID_END_IDX /* TODO =OpenOltPlatform.MAX_UNIS_PER_ONU-1*/, 1, "", 0, nil)
 }
 
 // Delete clears used resources for the particular olt device being deleted
 func (RsrcMgr *OpenOltResourceMgr) Delete() error {
 	for _, rsrcMgr := range RsrcMgr.ResourceMgrs {
 		if err := rsrcMgr.ClearDeviceResourcePool(context.Background()); err != nil {
-			log.Debug("Failed to clear device resource pool")
+			logger.Debug(nil, "Failed to clear device resource pool")
 			return err
 		}
 	}
-	log.Debug("Cleared device resource pool")
+	logger.Debug(nil, "Cleared device resource pool")
 	return nil
 }
 
@@ -308,7 +302,7 @@
 	ONUIDs, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(context.Background(), ponIntfID,
 		ponrmgr.ONU_ID, 1)
 	if err != nil {
-		log.Errorf("Failed to get resource for interface %d for type %s",
+		logger.Errorf(nil, "Failed to get resource for interface %d for type %s",
 			ponIntfID, ponrmgr.ONU_ID)
 		return uint32(0), err
 	}
@@ -316,15 +310,9 @@
 }
 
 // GetFlowID return flow ID for a given pon interface id, onu id and uni id
-func (RsrcMgr *OpenOltResourceMgr) GetFlowID(ctx context.Context, ponIntfID uint32) (uint32, error) {
+func (RsrcMgr *OpenOltResourceMgr) GetFlowID(ctx context.Context, ponIntfID uint32) (uint64, error) {
 	RsrcMgr.FlowIDMgmtLock.Lock()
 	defer RsrcMgr.FlowIDMgmtLock.Unlock()
-	FlowIDs, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(context.Background(), ponIntfID,
-		ponrmgr.FLOW_ID, 1)
-	if err != nil {
-		log.Errorf("Failed to get resource for interface %d for type %s",
-			ponIntfID, ponrmgr.FLOW_ID)
-		return uint32(0), err
-	}
-	return FlowIDs[0], err
+	RsrcMgr.flow_id++
+	return RsrcMgr.flow_id, nil
 }
diff --git a/core/subscriber_manager.go b/core/subscriber_manager.go
index 3cf65ad..9abec13 100644
--- a/core/subscriber_manager.go
+++ b/core/subscriber_manager.go
@@ -21,16 +21,12 @@
 	"sync"
 
 	"github.com/opencord/openolt-scale-tester/config"
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	"github.com/opencord/voltha-lib-go/v3/pkg/techprofile"
-	oop "github.com/opencord/voltha-protos/v3/go/openolt"
+	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	"github.com/opencord/voltha-lib-go/v4/pkg/techprofile"
+	oop "github.com/opencord/voltha-protos/v4/go/openolt"
 	"golang.org/x/net/context"
 )
 
-func init() {
-	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
-}
-
 const (
 	SUBSCRIBER_PROVISION_SUCCESS = iota
 	TP_INSTANCE_CREATION_FAILED
@@ -41,7 +37,7 @@
 )
 
 const (
-	UniPortName = "pon-{%d}/onu-{%d}/uni-{%d}"
+	UniPortName = "olt-{1}/pon-{%d}/onu-{%d}/uni-{%d}"
 )
 
 var Reason = [...]string{
@@ -92,20 +88,20 @@
 
 func (subs *Subscriber) Start(isGroup bool) {
 
-	var err error
-
-	log.Infow("workflow-deploy-started-for-subscriber", log.Fields{"subsName": subs.SubscriberName})
+	logger.Infow(nil, "workflow-deploy-started-for-subscriber", log.Fields{"subsName": subs.SubscriberName})
 
 	subs.TpInstance = make(map[int]*techprofile.TechProfile)
 
 	for _, tpID := range subs.TestConfig.TpIDList {
 		uniPortName := fmt.Sprintf(UniPortName, subs.PonIntf, subs.OnuID, subs.UniID)
 		subs.RsrMgr.GemIDAllocIDLock[subs.PonIntf].Lock()
-		subs.TpInstance[tpID], err = subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.CreateTechProfInstance(context.Background(),
-			uint32(tpID), uniPortName, subs.PonIntf)
+		tpInstInterface, err := subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.CreateTechProfInstance(context.Background(), uint32(tpID), uniPortName, subs.PonIntf)
+		// TODO: Assumes the techprofile is of type TechProfile (XGPON, GPON). But it could also be EPON TechProfile type. But we do not support that at the moment, so it is OK.
+		subs.TpInstance[tpID] = tpInstInterface.(*techprofile.TechProfile)
+
 		subs.RsrMgr.GemIDAllocIDLock[subs.PonIntf].Unlock()
 		if err != nil {
-			log.Errorw("error-creating-tp-instance-for-subs",
+			logger.Errorw(nil, "error-creating-tp-instance-for-subs",
 				log.Fields{"subsName": subs.SubscriberName, "onuID": subs.OnuID, "tpID": tpID})
 
 			subs.Reason = ReasonCodeToReasonString(TP_INSTANCE_CREATION_FAILED)
@@ -115,5 +111,5 @@
 
 	go DeployWorkflow(subs, isGroup)
 
-	log.Infow("workflow-deploy-started-for-subscriber", log.Fields{"subsName": subs.SubscriberName})
+	logger.Infow(nil, "workflow-deploy-started-for-subscriber", log.Fields{"subsName": subs.SubscriberName})
 }
diff --git a/core/tt_workflow.go b/core/tt_workflow.go
index 7b14007..783dfe6 100644
--- a/core/tt_workflow.go
+++ b/core/tt_workflow.go
@@ -22,9 +22,9 @@
 	"sync/atomic"
 
 	"github.com/opencord/openolt-scale-tester/config"
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	oop "github.com/opencord/voltha-protos/v3/go/openolt"
-	tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
+	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	oop "github.com/opencord/voltha-protos/v4/go/openolt"
+	tp_pb "github.com/opencord/voltha-protos/v4/go/tech_profile"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -32,16 +32,12 @@
 
 var lastPonIntf *uint32 = new(uint32)
 
-func init() {
-	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
-}
-
 // A dummy struct to comply with the WorkFlow interface.
 type TtWorkFlow struct {
 }
 
 func AddTtDhcpIPV4Flow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig, rsrMgr *OpenOltResourceMgr) error {
-	var flowID uint32
+	var flowID uint64
 	var err error
 
 	// Allocating flowID from PON0 pool for an trap-from-nni flow
@@ -63,21 +59,21 @@
 
 	st, _ := status.FromError(err)
 	if st.Code() == codes.AlreadyExists {
-		log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+		logger.Debugw(nil, "Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
 		return nil
 	}
 
 	if err != nil {
-		log.Errorw("Failed to Add DHCP IPv4 to device", log.Fields{"err": err, "deviceFlow": flow})
+		logger.Errorw(nil, "Failed to Add DHCP IPv4 to device", log.Fields{"err": err, "deviceFlow": flow})
 		return err
 	}
-	log.Debugw("DHCP IPV4 added to device successfully ", log.Fields{"flow": flow})
+	logger.Debugw(nil, "DHCP IPV4 added to device successfully ", log.Fields{"flow": flow})
 
 	return nil
 }
 
 func AddTtDhcpIPV6Flow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig, rsrMgr *OpenOltResourceMgr) error {
-	log.Info("tt-workflow-does-not-require-dhcp-ipv6-support--nothing-to-do")
+	logger.Info(nil, "tt-workflow-does-not-require-dhcp-ipv6-support--nothing-to-do")
 	return nil
 }
 
@@ -140,13 +136,13 @@
 			flowClassifier.OVid = 75
 			flowClassifier.PktTagType = SingleTag
 		default:
-			log.Errorw("Unsupported TT flow type", log.Fields{"flowtype": flowType,
+			logger.Errorw(nil, "Unsupported TT flow type", log.Fields{"flowtype": flowType,
 				"direction": direction})
 		}
 	} else if direction == Downstream {
 		switch flowType {
 		case IgmpFlow:
-			log.Errorw("Downstream IGMP flows are not required instead we have "+
+			logger.Errorw(nil, "Downstream IGMP flows are not required instead we have "+
 				"IGMP trap flows already installed", log.Fields{"flowtype": flowType,
 				"direction": direction})
 		case HsiaFlow:
@@ -192,41 +188,46 @@
 			flowClassifier.DstMac = GenerateMac(true)
 			flowClassifier.PktTagType = DoubleTag
 		default:
-			log.Errorw("Unsupported TT flow type", log.Fields{"flowtype": flowType,
+			logger.Errorw(nil, "Unsupported TT flow type", log.Fields{"flowtype": flowType,
 				"direction": direction})
 		}
 	}
 	return flowClassifier, actionInfo
 }
 
-func AddTtFlow(subs *Subscriber, flowType string, direction string, flowID uint32,
-	allocID uint32, gemID uint32, pcp uint32) error {
-	log.Infow("add-flow", log.Fields{"WorkFlow": subs.TestConfig.WorkflowName, "FlowType": flowType,
+func AddTtFlow(subs *Subscriber, flowType string, direction string, flowID uint64,
+	allocID uint32, gemID uint32, pcp uint32, replicateFlow bool, symmetricFlowID uint64,
+	pbitToGem map[uint32]uint32) error {
+	logger.Infow(nil, "add-flow", log.Fields{"WorkFlow": subs.TestConfig.WorkflowName, "FlowType": flowType,
 		"direction": direction, "flowID": flowID})
 	var err error
 
 	flowClassifier, actionInfo := FormatTtClassfierAction(flowType, direction, subs)
-	// Update the o_pbit for which this flow has to be classified
-	flowClassifier.OPbits = pcp
+	// Update the o_pbit (if valid) for which this flow has to be classified
+	if pcp != 0xff {
+		flowClassifier.OPbits = pcp
+	}
 	flow := oop.Flow{AccessIntfId: int32(subs.PonIntf), OnuId: int32(subs.OnuID),
 		UniId: int32(subs.UniID), FlowId: flowID,
 		FlowType: direction, AllocId: int32(allocID), GemportId: int32(gemID),
 		Classifier: &flowClassifier, Action: &actionInfo,
-		Priority: 1000, PortNo: subs.UniPortNo}
+		Priority: 1000, PortNo: subs.UniPortNo,
+		SymmetricFlowId: symmetricFlowID,
+		ReplicateFlow:   replicateFlow, PbitToGemport: pbitToGem}
 
 	_, err = subs.OpenOltClient.FlowAdd(context.Background(), &flow)
 
 	st, _ := status.FromError(err)
 	if st.Code() == codes.AlreadyExists {
-		log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+		logger.Debugw(nil, "Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
 		return nil
 	}
 
 	if err != nil {
-		log.Errorw("Failed to Add flow to device", log.Fields{"err": err, "deviceFlow": flow})
+		logger.Errorw(nil, "Failed to Add flow to device", log.Fields{"err": err, "deviceFlow": flow})
 		return errors.New(ReasonCodeToReasonString(FLOW_ADD_FAILED))
 	}
-	log.Debugw("Flow added to device successfully ", log.Fields{"flow": flow})
+	logger.Debugw(nil, "Flow added to device successfully ", log.Fields{"flow": flow})
 
 	return nil
 }
@@ -234,75 +235,75 @@
 func (tt TtWorkFlow) ProvisionScheds(subs *Subscriber) error {
 	var trafficSched []*tp_pb.TrafficScheduler
 
-	log.Info("provisioning-scheds")
+	logger.Info(nil, "provisioning-scheds")
 
 	if trafficSched = getTrafficSched(subs, tp_pb.Direction_DOWNSTREAM); trafficSched == nil {
-		log.Error("ds-traffic-sched-is-nil")
+		logger.Error(nil, "ds-traffic-sched-is-nil")
 		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
 	}
 
-	log.Debugw("Sending Traffic scheduler create to device",
+	logger.Debugw(nil, "Sending Traffic scheduler create to device",
 		log.Fields{"Direction": tp_pb.Direction_DOWNSTREAM, "TrafficScheds": trafficSched})
 	if _, err := subs.OpenOltClient.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
 		IntfId: subs.PonIntf, OnuId: subs.OnuID,
 		UniId: subs.UniID, PortNo: subs.UniPortNo,
 		TrafficScheds: trafficSched}); err != nil {
-		log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
+		logger.Errorw(nil, "Failed to create traffic schedulers", log.Fields{"error": err})
 		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
 	}
 
 	if trafficSched = getTrafficSched(subs, tp_pb.Direction_UPSTREAM); trafficSched == nil {
-		log.Error("us-traffic-sched-is-nil")
+		logger.Error(nil, "us-traffic-sched-is-nil")
 		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
 	}
 
-	log.Debugw("Sending Traffic scheduler create to device",
+	logger.Debugw(nil, "Sending Traffic scheduler create to device",
 		log.Fields{"Direction": tp_pb.Direction_UPSTREAM, "TrafficScheds": trafficSched})
 	if _, err := subs.OpenOltClient.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
 		IntfId: subs.PonIntf, OnuId: subs.OnuID,
 		UniId: subs.UniID, PortNo: subs.UniPortNo,
 		TrafficScheds: trafficSched}); err != nil {
-		log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
+		logger.Errorw(nil, "Failed to create traffic schedulers", log.Fields{"error": err})
 		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
 	}
 	return nil
 }
 
 func (tt TtWorkFlow) ProvisionQueues(subs *Subscriber) error {
-	log.Info("provisioning-queues")
+	logger.Info(nil, "provisioning-queues")
 
 	var trafficQueues []*tp_pb.TrafficQueue
 	if trafficQueues = getTrafficQueues(subs, tp_pb.Direction_DOWNSTREAM); trafficQueues == nil {
-		log.Error("Failed to create traffic queues")
+		logger.Error(nil, "Failed to create traffic queues")
 		return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
 	}
 
 	// On receiving the CreateTrafficQueues request, the driver should create corresponding
 	// downstream queues.
-	log.Debugw("Sending Traffic Queues create to device",
+	logger.Debugw(nil, "Sending Traffic Queues create to device",
 		log.Fields{"Direction": tp_pb.Direction_DOWNSTREAM, "TrafficQueues": trafficQueues})
 	if _, err := subs.OpenOltClient.CreateTrafficQueues(context.Background(),
 		&tp_pb.TrafficQueues{IntfId: subs.PonIntf, OnuId: subs.OnuID,
 			UniId: subs.UniID, PortNo: subs.UniPortNo,
 			TrafficQueues: trafficQueues}); err != nil {
-		log.Errorw("Failed to create traffic queues in device", log.Fields{"error": err})
+		logger.Errorw(nil, "Failed to create traffic queues in device", log.Fields{"error": err})
 		return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
 	}
 
 	if trafficQueues = getTrafficQueues(subs, tp_pb.Direction_UPSTREAM); trafficQueues == nil {
-		log.Error("Failed to create traffic queues")
+		logger.Error(nil, "Failed to create traffic queues")
 		return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
 	}
 
 	// On receiving the CreateTrafficQueues request, the driver should create corresponding
 	// upstream queues.
-	log.Debugw("Sending Traffic Queues create to device",
+	logger.Debugw(nil, "Sending Traffic Queues create to device",
 		log.Fields{"Direction": tp_pb.Direction_UPSTREAM, "TrafficQueues": trafficQueues})
 	if _, err := subs.OpenOltClient.CreateTrafficQueues(context.Background(),
 		&tp_pb.TrafficQueues{IntfId: subs.PonIntf, OnuId: subs.OnuID,
 			UniId: subs.UniID, PortNo: subs.UniPortNo,
 			TrafficQueues: trafficQueues}); err != nil {
-		log.Errorw("Failed to create traffic queues in device", log.Fields{"error": err})
+		logger.Errorw(nil, "Failed to create traffic queues in device", log.Fields{"error": err})
 		return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
 	}
 
@@ -310,29 +311,31 @@
 }
 
 func (tt TtWorkFlow) ProvisionEapFlow(subs *Subscriber) error {
-	log.Info("tt-workflow-does-not-support-eap-yet--nothing-to-do")
+	logger.Info(nil, "tt-workflow-does-not-support-eap-yet--nothing-to-do")
 	return nil
 }
 
 func (tt TtWorkFlow) ProvisionDhcpIPV4Flow(subs *Subscriber) error {
-	log.Info("tt-workflow-does-not-require-dhcp-ipv4-yet--nothing-to-do")
+	logger.Info(nil, "tt-workflow-does-not-require-dhcp-ipv4-yet--nothing-to-do")
 	return nil
 }
 
 func (tt TtWorkFlow) ProvisionDhcpIPV6Flow(subs *Subscriber) error {
-	log.Info("tt-workflow-does-not-require-dhcp-ipv6-support--nothing-to-do")
+	logger.Info(nil, "tt-workflow-does-not-require-dhcp-ipv6-support--nothing-to-do")
 	return nil
 }
 
 func (tt TtWorkFlow) ProvisionIgmpFlow(subs *Subscriber) error {
-	log.Info("tt-workflow-does-not-require-igmp-support--nothing-to-do")
+	logger.Info(nil, "tt-workflow-does-not-require-igmp-support--nothing-to-do")
 	return nil
 }
 
 func (tt TtWorkFlow) ProvisionHsiaFlow(subs *Subscriber) error {
-	var err error
-	var flowID uint32
 	var gemPortIDs []uint32
+	var err error
+	var flowIDUs, flowIDDs uint64
+	pbitToGem := make(map[uint32]uint32)
+	var pcp uint32
 
 	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
 	for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
@@ -343,37 +346,45 @@
 		pBitMap := subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList[idx].PbitMap
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
-				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
+				pcp = uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
+				var errUs, errDs error
+				if flowIDUs, err = subs.RsrMgr.GetFlowID(context.Background(), subs.PonIntf); err != nil {
 					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
-				} else {
-					var errUs, errDs error
-					if errUs = AddTtFlow(subs, HsiaFlow, Upstream, flowID, allocID, gemID, pcp); errUs != nil {
-						log.Errorw("failed to install US HSIA flow",
-							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
-					}
-					if errDs = AddTtFlow(subs, HsiaFlow, Downstream, flowID, allocID, gemID, pcp); errDs != nil {
-						log.Errorw("failed to install DS HSIA flow",
-							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
-					}
+				}
+				if errUs = AddTtFlow(subs, HsiaFlow, Upstream, flowIDUs, allocID, gemID, pcp, false,
+					0, pbitToGem); errUs != nil {
+					logger.Errorw(nil, "failed to install US HSIA flow",
+						log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+				}
+				if flowIDDs, err = subs.RsrMgr.GetFlowID(context.Background(), subs.PonIntf); err != nil {
+					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+				}
+				if errDs = AddTtFlow(subs, HsiaFlow, Downstream, flowIDDs, allocID, gemID, pcp, false,
+					flowIDUs, pbitToGem); errDs != nil {
+					logger.Errorw(nil, "failed to install DS HSIA flow",
+						log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+				}
 
-					if errUs != nil || errDs != nil {
-						if errUs != nil {
-							return errUs
-						}
-						return errDs
+				if errUs != nil || errDs != nil {
+					if errUs != nil {
+						return errUs
 					}
+					return errDs
 				}
 			}
 		}
 	}
+
 	return nil
 }
 
 func (tt TtWorkFlow) ProvisionVoipFlow(subs *Subscriber) error {
 	var err error
-	var flowID uint32
 	var gemPortIDs []uint32
+	var errUs, errDs, errDhcp error
+	var flowIDUs, flowIDDs, flowIDDhcp uint64
+	pbitToGem := make(map[uint32]uint32)
+	var pcp uint32
 
 	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
 	for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
@@ -384,44 +395,55 @@
 		pBitMap := subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList[idx].PbitMap
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
-				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
+				pcp = uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
+				if flowIDUs, err = subs.RsrMgr.GetFlowID(context.Background(), subs.PonIntf); err != nil {
 					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
-				} else {
-					var errUs, errDs, errDhcp error
-					if errUs = AddTtFlow(subs, VoipFlow, Upstream, flowID, allocID, gemID, pcp); errUs != nil {
-						log.Errorw("failed to install US VOIP flow",
-							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
-					}
-					if errDs = AddTtFlow(subs, VoipFlow, Downstream, flowID, allocID, gemID, pcp); errDs != nil {
-						log.Errorw("failed to install DS VOIP flow",
-							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
-					}
-					if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowID, allocID, gemID, pcp); errDhcp != nil {
-						log.Errorw("failed to install US VOIP-DHCP flow",
-							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
-					}
+				}
+				if errUs = AddTtFlow(subs, VoipFlow, Upstream, flowIDUs, allocID, gemID, pcp, false,
+					0, pbitToGem); errUs != nil {
+					logger.Errorw(nil, "failed to install US VOIP flow",
+						log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+				}
+				if flowIDDs, err = subs.RsrMgr.GetFlowID(context.Background(), subs.PonIntf); err != nil {
+					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+				}
+				if errDs = AddTtFlow(subs, VoipFlow, Downstream, flowIDDs, allocID, gemID, pcp, false,
+					flowIDUs, pbitToGem); errDs != nil {
+					logger.Errorw(nil, "failed to install DS VOIP flow",
+						log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+				}
+				if flowIDDhcp, err = subs.RsrMgr.GetFlowID(context.Background(), subs.PonIntf); err != nil {
+					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+				}
+				if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowIDDhcp, allocID, gemID, pcp, false,
+					0, pbitToGem); errDhcp != nil {
+					logger.Errorw(nil, "failed to install US VOIP-DHCP flow",
+						log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+				}
 
-					if errUs != nil || errDs != nil || errDhcp != nil {
-						if errUs != nil {
-							return errUs
-						}
-						if errDs != nil {
-							return errDs
-						}
-						return errDhcp
+				if errUs != nil || errDs != nil || errDhcp != nil {
+					if errUs != nil {
+						return errUs
 					}
+					if errDs != nil {
+						return errDs
+					}
+					return errDhcp
 				}
 			}
 		}
 	}
+
 	return nil
 }
 
 func (tt TtWorkFlow) ProvisionVodFlow(subs *Subscriber) error {
 	var err error
-	var flowID uint32
 	var gemPortIDs []uint32
+	var errUs, errDs, errDhcp, errIgmp error
+	var flowIDUs, flowIDDs, flowIDDhcp, flowIDIgmp uint64
+	pbitToGem := make(map[uint32]uint32)
+	var pcp uint32
 
 	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
 	for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
@@ -432,40 +454,51 @@
 		pBitMap := subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList[idx].PbitMap
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
-				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
+				pcp = uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
+				if flowIDUs, err = subs.RsrMgr.GetFlowID(context.Background(), subs.PonIntf); err != nil {
 					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
-				} else {
-					var errUs, errDs, errDhcp, errIgmp error
-					if errUs = AddTtFlow(subs, VodFlow, Upstream, flowID, allocID, gemID, pcp); errUs != nil {
-						log.Errorw("failed to install US VOIP flow",
-							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
-					}
-					if errDs = AddTtFlow(subs, VodFlow, Downstream, flowID, allocID, gemID, pcp); errDs != nil {
-						log.Errorw("failed to install DS VOIP flow",
-							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
-					}
-					if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowID, allocID, gemID, pcp); errDhcp != nil {
-						log.Errorw("failed to install US VOIP-DHCP flow",
-							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
-					}
-					if errIgmp = AddTtFlow(subs, IgmpFlow, Upstream, flowID, allocID, gemID, pcp); errIgmp != nil {
-						log.Errorw("failed to install US VOIP-IGMP flow",
-							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
-					}
+				}
+				if errUs = AddTtFlow(subs, VodFlow, Upstream, flowIDUs, allocID, gemID, pcp, false,
+					0, pbitToGem); errUs != nil {
+					logger.Errorw(nil, "failed to install US VOIP flow",
+						log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+				}
+				if flowIDDs, err = subs.RsrMgr.GetFlowID(context.Background(), subs.PonIntf); err != nil {
+					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+				}
+				if errDs = AddTtFlow(subs, VodFlow, Downstream, flowIDDs, allocID, gemID, pcp, false,
+					flowIDUs, pbitToGem); errDs != nil {
+					logger.Errorw(nil, "failed to install DS VOIP flow",
+						log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+				}
+				if flowIDDhcp, err = subs.RsrMgr.GetFlowID(context.Background(), subs.PonIntf); err != nil {
+					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+				}
+				if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowIDDhcp, allocID, gemID, pcp, false,
+					0, pbitToGem); errDhcp != nil {
+					logger.Errorw(nil, "failed to install US VOIP-DHCP flow",
+						log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+				}
+				if flowIDIgmp, err = subs.RsrMgr.GetFlowID(context.Background(), subs.PonIntf); err != nil {
+					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+				}
+				if errIgmp = AddTtFlow(subs, IgmpFlow, Upstream, flowIDIgmp, allocID, gemID, pcp, false,
+					0, pbitToGem); errIgmp != nil {
+					logger.Errorw(nil, "failed to install US VOIP-IGMP flow",
+						log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+				}
 
-					if errUs != nil || errDs != nil || errDhcp != nil || errIgmp != nil {
-						if errUs != nil {
-							return errUs
-						}
-						if errDs != nil {
-							return errDs
-						}
-						if errDhcp != nil {
-							return errDhcp
-						}
-						return errIgmp
+				if errUs != nil || errDs != nil || errDhcp != nil || errIgmp != nil {
+					if errUs != nil {
+						return errUs
 					}
+					if errDs != nil {
+						return errDs
+					}
+					if errDhcp != nil {
+						return errDhcp
+					}
+					return errIgmp
 				}
 			}
 		}
@@ -475,8 +508,10 @@
 
 func (tt TtWorkFlow) ProvisionMgmtFlow(subs *Subscriber) error {
 	var err error
-	var flowID uint32
+	pbitToGem := make(map[uint32]uint32)
+	var flowIDUs, flowIDDs, flowIDDhcp uint64
 	var gemPortIDs []uint32
+	var pcp uint32
 
 	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
 	for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
@@ -487,33 +522,41 @@
 		pBitMap := subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList[idx].PbitMap
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
-				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
+				pcp = uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
+				var errUs, errDs, errDhcp error
+				if flowIDUs, err = subs.RsrMgr.GetFlowID(context.Background(), subs.PonIntf); err != nil {
 					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
-				} else {
-					var errUs, errDs, errDhcp error
-					if errUs = AddTtFlow(subs, MgmtFlow, Upstream, flowID, allocID, gemID, pcp); errUs != nil {
-						log.Errorw("failed to install US MGMT flow",
-							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
-					}
-					if errDs = AddTtFlow(subs, MgmtFlow, Downstream, flowID, allocID, gemID, pcp); errDs != nil {
-						log.Errorw("failed to install DS MGMT flow",
-							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
-					}
-					if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowID, allocID, gemID, pcp); errDhcp != nil {
-						log.Errorw("failed to install US MGMT-DHCP flow",
-							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
-					}
+				}
+				if errUs = AddTtFlow(subs, MgmtFlow, Upstream, flowIDUs, allocID, gemID, pcp, false,
+					0, pbitToGem); errUs != nil {
+					logger.Errorw(nil, "failed to install US MGMT flow",
+						log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+				}
+				if flowIDDs, err = subs.RsrMgr.GetFlowID(context.Background(), subs.PonIntf); err != nil {
+					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+				}
+				if errDs = AddTtFlow(subs, MgmtFlow, Downstream, flowIDDs, allocID, gemID, pcp, false,
+					flowIDUs, pbitToGem); errDs != nil {
+					logger.Errorw(nil, "failed to install DS MGMT flow",
+						log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+				}
+				if flowIDDhcp, err = subs.RsrMgr.GetFlowID(context.Background(), subs.PonIntf); err != nil {
+					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+				}
+				if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowIDDhcp, allocID, gemID, pcp, false,
+					0, pbitToGem); errDhcp != nil {
+					logger.Errorw(nil, "failed to install US MGMT-DHCP flow",
+						log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+				}
 
-					if errUs != nil || errDs != nil || errDhcp != nil {
-						if errUs != nil {
-							return errUs
-						}
-						if errDs != nil {
-							return errDs
-						}
-						return errDhcp
+				if errUs != nil || errDs != nil || errDhcp != nil {
+					if errUs != nil {
+						return errUs
 					}
+					if errDs != nil {
+						return errDs
+					}
+					return errDhcp
 				}
 			}
 		}
@@ -536,7 +579,7 @@
 	grp.GemPortID = 4069
 	grp.SchedPolicy = tp_pb.SchedulingPolicy_WRR
 
-	log.Debugw("Group data", log.Fields{"OnuID": subs.OnuID, "GroupID": grp.GroupID, "numOfONUsPerPon": numOfONUsPerPon})
+	logger.Debugw(nil, "Group data", log.Fields{"OnuID": subs.OnuID, "GroupID": grp.GroupID, "numOfONUsPerPon": numOfONUsPerPon})
 
 	grp.GroupID = subs.OnuID
 
@@ -562,7 +605,7 @@
 	err = AddMulticastQueueFlow(&grp)
 
 	if err != nil {
-		log.Errorw("Failed to add multicast flow", log.Fields{"error": err})
+		logger.Errorw(nil, "Failed to add multicast flow", log.Fields{"error": err})
 	}
 
 	return err
diff --git a/core/utils.go b/core/utils.go
index b239443..1bcdedd 100644
--- a/core/utils.go
+++ b/core/utils.go
@@ -19,8 +19,8 @@
 import (
 	"fmt"
 
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	"github.com/opencord/voltha-protos/v3/go/openolt"
+	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	"github.com/opencord/voltha-protos/v4/go/openolt"
 )
 
 type DtStagKey struct {
@@ -34,7 +34,6 @@
 var TtCtag map[uint32]uint32
 
 func init() {
-	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
 	AttCtag = make(map[uint32]uint32)
 	DtCtag = make(map[uint32]uint32)
 	DtStag = make(map[DtStagKey]uint32)
@@ -59,9 +58,9 @@
 
 	vendorSpecificId += 1
 	vs := []byte(fmt.Sprint(vendorSpecificId))
-	// log.Infow("vendor-id-and-vendor-specific", log.Fields{"vi":vi, "vs":vs})
+	// logger.Infow(nil, "vendor-id-and-vendor-specific", log.Fields{"vi":vi, "vs":vs})
 	sn := &openolt.SerialNumber{VendorId: vi, VendorSpecific: vs}
-	// log.Infow("serial-num", log.Fields{"sn":sn})
+	// logger.Infow(nil, "serial-num", log.Fields{"sn":sn})
 
 	return sn
 }
@@ -70,7 +69,7 @@
 func MkUniPortNum(intfID, onuID, uniID uint32) uint32 {
 	var limit = int(onuID)
 	if limit > MaxOnusPerPon {
-		log.Warn("Warning: exceeded the MAX ONUS per PON")
+		logger.Warn(nil, "Warning: exceeded the MAX ONUS per PON")
 	}
 	return (intfID << (bitsForUniID + bitsForONUID)) | (onuID << bitsForUniID) | uniID
 }
@@ -144,7 +143,7 @@
 	case "TT":
 		return GetTtCtag(ponIntf)
 	default:
-		log.Errorw("unknown-workflowname", log.Fields{"workflow": workFlowName})
+		logger.Errorw(nil, "unknown-workflowname", log.Fields{"workflow": workFlowName})
 	}
 	return 0
 }
@@ -158,7 +157,7 @@
 	case "TT":
 		return GetTtStag(ponIntf)
 	default:
-		log.Errorw("unknown-workflowname", log.Fields{"workflow": workFlowName})
+		logger.Errorw(nil, "unknown-workflowname", log.Fields{"workflow": workFlowName})
 	}
 	return 0
 }
diff --git a/core/workflow_manager.go b/core/workflow_manager.go
index e9bb24e..f642eb5 100644
--- a/core/workflow_manager.go
+++ b/core/workflow_manager.go
@@ -20,14 +20,10 @@
 	"errors"
 
 	"github.com/opencord/openolt-scale-tester/config"
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	oop "github.com/opencord/voltha-protos/v3/go/openolt"
+	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	oop "github.com/opencord/voltha-protos/v4/go/openolt"
 )
 
-func init() {
-	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
-}
-
 type WorkFlow interface {
 	ProvisionScheds(subs *Subscriber) error
 	ProvisionQueues(subs *Subscriber) error
@@ -107,24 +103,24 @@
 		}
 	}
 
-	log.Infow("subscriber-provision-completed-from-onu-manager", log.Fields{"subsName": subs.SubscriberName})
+	logger.Infow(nil, "subscriber-provision-completed-from-onu-manager", log.Fields{"subsName": subs.SubscriberName})
 	subs.Reason = ReasonCodeToReasonString(SUBSCRIBER_PROVISION_SUCCESS)
 }
 
 func getWorkFlow(subs *Subscriber) WorkFlow {
 	switch subs.TestConfig.WorkflowName {
 	case "ATT":
-		log.Info("chosen-att-workflow")
+		logger.Info(nil, "chosen-att-workflow")
 		return AttWorkFlow{}
 	case "DT":
-		log.Info("chosen-dt-workflow")
+		logger.Info(nil, "chosen-dt-workflow")
 		return DtWorkFlow{}
 	case "TT":
-		log.Info("chosen-tt-workflow")
+		logger.Info(nil, "chosen-tt-workflow")
 		return TtWorkFlow{}
 	// TODO: Add new workflow here
 	default:
-		log.Errorw("operator-workflow-not-supported-yet", log.Fields{"workflowName": subs.TestConfig.WorkflowName})
+		logger.Errorw(nil, "operator-workflow-not-supported-yet", log.Fields{"workflowName": subs.TestConfig.WorkflowName})
 	}
 	return nil
 }
@@ -135,22 +131,22 @@
 	switch config.WorkflowName {
 	case "ATT":
 		if err := ProvisionAttNniTrapFlow(oo, config, rsrMgr); err != nil {
-			log.Error("error-installing-flow", log.Fields{"err": err})
+			logger.Error(nil, "error-installing-flow", log.Fields{"err": err})
 			return err
 		}
 	case "DT":
 		if err := ProvisionDtNniTrapFlow(oo, config, rsrMgr); err != nil {
-			log.Error("error-installing-flow", log.Fields{"err": err})
+			logger.Error(nil, "error-installing-flow", log.Fields{"err": err})
 			return err
 		}
 	case "TT":
 		if err := ProvisionTtNniTrapFlow(oo, config, rsrMgr); err != nil {
-			log.Error("error-installing-flow", log.Fields{"err": err})
+			logger.Error(nil, "error-installing-flow", log.Fields{"err": err})
 			return err
 		}
 	// TODO: Add new items here
 	default:
-		log.Errorw("operator-workflow-not-supported-yet", log.Fields{"workflowName": config.WorkflowName})
+		logger.Errorw(nil, "operator-workflow-not-supported-yet", log.Fields{"workflowName": config.WorkflowName})
 		return errors.New("workflow-not-supported")
 	}
 	return nil
diff --git a/core/workflow_utils.go b/core/workflow_utils.go
index a223c02..818b3ec 100644
--- a/core/workflow_utils.go
+++ b/core/workflow_utils.go
@@ -22,10 +22,9 @@
 	"time"
 
 	"github.com/opencord/openolt-scale-tester/config"
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	"github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
-	oop "github.com/opencord/voltha-protos/v3/go/openolt"
-	tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
+	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	oop "github.com/opencord/voltha-protos/v4/go/openolt"
+	tp_pb "github.com/opencord/voltha-protos/v4/go/tech_profile"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -102,14 +101,14 @@
 
 	if direction == tp_pb.Direction_DOWNSTREAM {
 		SchedCfg, err = subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
-			GetDsScheduler(subs.TpInstance[subs.TestConfig.TpIDList[0]])
+			GetDsScheduler(nil, subs.TpInstance[subs.TestConfig.TpIDList[0]])
 	} else {
 		SchedCfg, err = subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
-			GetUsScheduler(subs.TpInstance[subs.TestConfig.TpIDList[0]])
+			GetUsScheduler(nil, subs.TpInstance[subs.TestConfig.TpIDList[0]])
 	}
 
 	if err != nil {
-		log.Errorw("Failed to create traffic schedulers", log.Fields{"direction": direction, "error": err})
+		logger.Errorw(nil, "Failed to create traffic schedulers", log.Fields{"direction": direction, "error": err})
 		return nil
 	}
 
@@ -132,13 +131,13 @@
 func getTrafficQueues(subs *Subscriber, direction tp_pb.Direction) []*tp_pb.TrafficQueue {
 
 	trafficQueues, err := subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
-		GetTrafficQueues(subs.TpInstance[subs.TestConfig.TpIDList[0]], direction)
+		GetTrafficQueues(nil, subs.TpInstance[subs.TestConfig.TpIDList[0]], direction)
 
 	if err == nil {
 		return trafficQueues
 	}
 
-	log.Errorw("Failed to create traffic queues", log.Fields{"direction": direction, "error": err})
+	logger.Errorw(nil, "Failed to create traffic queues", log.Fields{"direction": direction, "error": err})
 	return nil
 }
 
@@ -178,21 +177,21 @@
 			actionInfo.Cmd = &actionCmd
 			actionInfo.OVid = subs.Stag
 		default:
-			log.Errorw("Unsupported flow type", log.Fields{"flowtype": flowType,
+			logger.Errorw(nil, "Unsupported flow type", log.Fields{"flowtype": flowType,
 				"direction": direction})
 		}
 	} else if direction == Downstream {
 		switch flowType {
 		case EapolFlow:
-			log.Errorw("Downstream EAP flows are not required instead controller "+
+			logger.Errorw(nil, "Downstream EAP flows are not required instead controller "+
 				"packet outs EAP response directly to onu in downstream", log.Fields{"flowtype": flowType,
 				"direction": direction})
 		case DhcpFlowIPV4:
-			log.Errorw("Downstream DHCPIPV4 flows are not required instead we have "+
+			logger.Errorw(nil, "Downstream DHCPIPV4 flows are not required instead we have "+
 				"NNI trap flows already installed", log.Fields{"flowtype": flowType,
 				"direction": direction})
 		case DhcpFlowIPV6:
-			log.Errorw("Downstream DHCPIPV6 flows are not required instead we have "+
+			logger.Errorw(nil, "Downstream DHCPIPV6 flows are not required instead we have "+
 				"NNI trap flows already installed", log.Fields{"flowtype": flowType,
 				"direction": direction})
 		case HsiaFlow:
@@ -203,51 +202,54 @@
 			actionInfo.Cmd = &actionCmd
 			actionInfo.OVid = subs.Stag
 		default:
-			log.Errorw("Unsupported flow type", log.Fields{"flowtype": flowType,
+			logger.Errorw(nil, "Unsupported flow type", log.Fields{"flowtype": flowType,
 				"direction": direction})
 		}
 	}
 	return flowClassifier, actionInfo
 }
 
-func AddFlow(subs *Subscriber, flowType string, direction string, flowID uint32,
-	allocID uint32, gemID uint32, pcp uint32) error {
-	log.Infow("add-flow", log.Fields{"WorkFlow": subs.TestConfig.WorkflowName, "FlowType": flowType,
+func AddFlow(subs *Subscriber, flowType string, direction string, flowID uint64,
+	allocID uint32, gemID uint32, pcp uint32, replicateFlow bool, symmetricFlowID uint64,
+	pbitToGem map[uint32]uint32) error {
+	logger.Infow(nil, "add-flow", log.Fields{"WorkFlow": subs.TestConfig.WorkflowName, "FlowType": flowType,
 		"direction": direction, "flowID": flowID})
 	var err error
 
 	flowClassifier, actionInfo := FormatClassfierAction(flowType, direction, subs)
-	// Update the o_pbit for which this flow has to be classified
-	flowClassifier.OPbits = pcp
+	// Update the o_pbit (if valid) for which this flow has to be classified
+	if pcp != 0xff {
+		flowClassifier.OPbits = pcp
+	}
 	flow := oop.Flow{AccessIntfId: int32(subs.PonIntf), OnuId: int32(subs.OnuID),
 		UniId: int32(subs.UniID), FlowId: flowID,
 		FlowType: direction, AllocId: int32(allocID), GemportId: int32(gemID),
 		Classifier: &flowClassifier, Action: &actionInfo,
-		Priority: 1000, PortNo: subs.UniPortNo}
+		Priority: 1000, PortNo: subs.UniPortNo, SymmetricFlowId: symmetricFlowID,
+		ReplicateFlow: replicateFlow, PbitToGemport: pbitToGem}
 
 	_, err = subs.OpenOltClient.FlowAdd(context.Background(), &flow)
 
 	st, _ := status.FromError(err)
 	if st.Code() == codes.AlreadyExists {
-		log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+		logger.Debugw(nil, "Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
 		return nil
 	}
 
 	if err != nil {
-		log.Errorw("Failed to Add flow to device", log.Fields{"err": err, "deviceFlow": flow})
+		logger.Errorw(nil, "Failed to Add flow to device", log.Fields{"err": err, "deviceFlow": flow})
 		return errors.New(ReasonCodeToReasonString(FLOW_ADD_FAILED))
 	}
-	log.Debugw("Flow added to device successfully ", log.Fields{"flow": flow})
+	logger.Debugw(nil, "Flow added to device successfully ", log.Fields{"flow": flow})
 
 	return nil
 }
 
 func AddLldpFlow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig, rsrMgr *OpenOltResourceMgr) error {
-	var flowID []uint32
+	var flowID uint64
 	var err error
 
-	if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(context.Background(), uint32(config.NniIntfID),
-		ponresourcemanager.FLOW_ID, 1); err != nil {
+	if flowID, err = rsrMgr.GetFlowID(context.Background(), uint32(config.NniIntfID)); err != nil {
 		return err
 	}
 
@@ -255,7 +257,7 @@
 	actionCmd := &oop.ActionCmd{TrapToHost: true}
 	actionInfo := &oop.Action{Cmd: actionCmd}
 
-	flow := oop.Flow{AccessIntfId: -1, OnuId: -1, UniId: -1, FlowId: flowID[0],
+	flow := oop.Flow{AccessIntfId: -1, OnuId: -1, UniId: -1, FlowId: flowID,
 		FlowType: "downstream", AllocId: -1, GemportId: -1,
 		Classifier: flowClassifier, Action: actionInfo,
 		Priority: 1000, PortNo: uint32(config.NniIntfID)}
@@ -264,17 +266,15 @@
 
 	st, _ := status.FromError(err)
 	if st.Code() == codes.AlreadyExists {
-		log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+		logger.Debugw(nil, "Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
 		return nil
 	}
 
 	if err != nil {
-		log.Errorw("Failed to Add LLDP flow to device", log.Fields{"err": err, "deviceFlow": flow})
-		rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(context.Background(), uint32(config.NniIntfID),
-			ponresourcemanager.FLOW_ID, flowID)
+		logger.Errorw(nil, "Failed to Add LLDP flow to device", log.Fields{"err": err, "deviceFlow": flow})
 		return err
 	}
-	log.Debugw("LLDP flow added to device successfully ", log.Fields{"flow": flow})
+	logger.Debugw(nil, "LLDP flow added to device successfully ", log.Fields{"flow": flow})
 
 	return nil
 }
@@ -312,11 +312,11 @@
 	var res *oop.Empty
 
 	if res, err = oop.OpenoltClient.PerformGroupOperation(oo, context.Background(), groupCfg); err != nil {
-		log.Errorw("Failed to perform - PerformGroupOperation()", log.Fields{"err": err})
+		logger.Errorw(nil, "Failed to perform - PerformGroupOperation()", log.Fields{"err": err})
 		return nil, err
 	}
 
-	log.Info("Successfully called - PerformGroupOperation()")
+	logger.Info(nil, "Successfully called - PerformGroupOperation()")
 
 	return res, nil
 }
@@ -324,7 +324,7 @@
 func CreateGroup(grp *GroupData) (*oop.Empty, error) {
 	var groupCfg oop.Group
 
-	log.Infow("creating group", log.Fields{"GroupID": grp.GroupID})
+	logger.Infow(nil, "creating group", log.Fields{"GroupID": grp.GroupID})
 
 	groupCfg.Command = oop.Group_SET_MEMBERS
 	groupCfg.GroupId = grp.GroupID
@@ -333,7 +333,7 @@
 }
 
 func OpMulticastTrafficQueue(grp *GroupData, isCreating bool) (*oop.Empty, error) {
-	log.Infow("operating on multicast traffic queue", log.Fields{"Creating": isCreating, "GroupID": grp.GroupID})
+	logger.Infow(nil, "operating on multicast traffic queue", log.Fields{"Creating": isCreating, "GroupID": grp.GroupID})
 
 	oo := grp.Subs.OpenOltClient
 
@@ -360,35 +360,34 @@
 
 	if isCreating {
 		if res, err = oop.OpenoltClient.CreateTrafficQueues(oo, context.Background(), &request); err != nil {
-			log.Errorw("Failed to perform - CreateTrafficQueues()", log.Fields{"err": err})
+			logger.Errorw(nil, "Failed to perform - CreateTrafficQueues()", log.Fields{"err": err})
 			return nil, err
 		}
 
-		log.Info("Successfully called - CreateTrafficQueues()")
+		logger.Info(nil, "Successfully called - CreateTrafficQueues()")
 	} else {
 		if res, err = oop.OpenoltClient.RemoveTrafficQueues(oo, context.Background(), &request); err != nil {
-			log.Errorw("Failed to perform - RemoveTrafficQueues()", log.Fields{"err": err})
+			logger.Errorw(nil, "Failed to perform - RemoveTrafficQueues()", log.Fields{"err": err})
 			return nil, err
 		}
 
-		log.Info("Successfully called - RemoveTrafficQueues()")
+		logger.Info(nil, "Successfully called - RemoveTrafficQueues()")
 	}
 
 	return res, nil
 }
 
 func AddMulticastFlow(grp *GroupData) error {
-	log.Infow("add multicast flow", log.Fields{"GroupID": grp.GroupID})
+	logger.Infow(nil, "add multicast flow", log.Fields{"GroupID": grp.GroupID})
 
 	oo := grp.Subs.OpenOltClient
 	config := grp.Subs.TestConfig
 	rsrMgr := grp.Subs.RsrMgr
 
-	var flowID []uint32
+	var flowID uint64
 	var err error
 
-	if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(context.Background(), uint32(config.NniIntfID),
-		ponresourcemanager.FLOW_ID, 1); err != nil {
+	if flowID, err = rsrMgr.GetFlowID(context.Background(), uint32(config.NniIntfID)); err != nil {
 		return err
 	}
 
@@ -400,7 +399,7 @@
 		DstMac:     GenerateMulticastMac(grp.Subs.OnuID, grp.GroupID),
 		PktTagType: DoubleTag}
 
-	flow := oop.Flow{AccessIntfId: int32(grp.Subs.PonIntf), OnuId: int32(grp.Subs.OnuID), UniId: int32(grp.Subs.UniID), FlowId: flowID[0],
+	flow := oop.Flow{AccessIntfId: int32(grp.Subs.PonIntf), OnuId: int32(grp.Subs.OnuID), UniId: int32(grp.Subs.UniID), FlowId: flowID,
 		FlowType: "multicast", AllocId: int32(grp.AllocID), GemportId: int32(grp.GemPortID),
 		Classifier: flowClassifier, Priority: int32(grp.Priority), PortNo: uint32(grp.Subs.UniPortNo), GroupId: uint32(grp.GroupID)}
 
@@ -408,24 +407,22 @@
 
 	st, _ := status.FromError(err)
 	if st.Code() == codes.AlreadyExists {
-		log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+		logger.Debugw(nil, "Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
 		return nil
 	}
 
 	if err != nil {
-		log.Errorw("Failed to add multicast flow to device", log.Fields{"err": err, "deviceFlow": flow})
-		rsrMgr.ResourceMgrs[uint32(grp.Subs.PonIntf)].FreeResourceID(context.Background(), uint32(config.NniIntfID),
-			ponresourcemanager.FLOW_ID, flowID)
+		logger.Errorw(nil, "Failed to add multicast flow to device", log.Fields{"err": err, "deviceFlow": flow})
 		return err
 	}
 
-	log.Debugw("Multicast flow added to device successfully ", log.Fields{"flow": flow})
+	logger.Debugw(nil, "Multicast flow added to device successfully ", log.Fields{"flow": flow})
 
 	return nil
 }
 
 func AddMulticastSched(grp *GroupData) error {
-	log.Infow("creating multicast sched", log.Fields{"GroupID": grp.GroupID})
+	logger.Infow(nil, "creating multicast sched", log.Fields{"GroupID": grp.GroupID})
 
 	SchedCfg := &tp_pb.SchedulerConfig{
 		Direction:    tp_pb.Direction_DOWNSTREAM,
@@ -448,18 +445,18 @@
 		GetTrafficScheduler(grp.Subs.TpInstance[grp.Subs.TestConfig.TpIDList[0]], SchedCfg, TfShInfo)}
 
 	if TrafficSched == nil {
-		log.Error("Create scheduler for multicast traffic failed")
+		logger.Error(nil, "Create scheduler for multicast traffic failed")
 		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
 	}
 
-	log.Debugw("Sending Traffic scheduler create to device",
+	logger.Debugw(nil, "Sending Traffic scheduler create to device",
 		log.Fields{"Direction": tp_pb.Direction_DOWNSTREAM, "TrafficScheds": TrafficSched})
 
 	if _, err := grp.Subs.OpenOltClient.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
 		IntfId: grp.Subs.PonIntf, OnuId: grp.Subs.OnuID,
 		UniId: grp.Subs.UniID, PortNo: grp.Subs.UniPortNo,
 		TrafficScheds: TrafficSched}); err != nil {
-		log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
+		logger.Errorw(nil, "Failed to create traffic schedulers", log.Fields{"error": err})
 		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
 	}
 
@@ -467,7 +464,7 @@
 }
 
 func OpMemberToGroup(grp *GroupData, isAdding bool) (*oop.Empty, error) {
-	log.Infow("operating on group", log.Fields{"Adding": isAdding})
+	logger.Infow(nil, "operating on group", log.Fields{"Adding": isAdding})
 
 	var groupCfg oop.Group
 
@@ -498,40 +495,40 @@
 func AddMulticastQueueFlow(grp *GroupData) error {
 	var err error
 
-	log.Debugw("Create multicast queue flow", log.Fields{"GroupID": grp.GroupID, "AddGroup": grp.AddGroup,
+	logger.Debugw(nil, "Create multicast queue flow", log.Fields{"GroupID": grp.GroupID, "AddGroup": grp.AddGroup,
 		"AddFlow": grp.AddFlow, "AddSched": grp.AddSched, "AddQueue": grp.AddQueue, "AddMember": grp.AddMember})
 
 	if grp.AddGroup {
 		if _, err = CreateGroup(grp); err != nil {
-			log.Error("Failed to add group to device")
+			logger.Error(nil, "Failed to add group to device")
 			return err
 		}
 	}
 
 	if grp.AddFlow {
 		if err = AddMulticastFlow(grp); err != nil {
-			log.Error("Failed to add multicast flow to device")
+			logger.Error(nil, "Failed to add multicast flow to device")
 			return err
 		}
 	}
 
 	if grp.AddSched {
 		if err = AddMulticastSched(grp); err != nil {
-			log.Error("Failed to add multicast sched to device")
+			logger.Error(nil, "Failed to add multicast sched to device")
 			return err
 		}
 	}
 
 	if grp.AddQueue {
 		if _, err = OpMulticastTrafficQueue(grp, true); err != nil {
-			log.Error("Failed to add multicast queue to device")
+			logger.Error(nil, "Failed to add multicast queue to device")
 			return err
 		}
 	}
 
 	if grp.AddMember {
 		if _, err = OpMemberToGroup(grp, true); err != nil {
-			log.Error("Failed to add member to group")
+			logger.Error(nil, "Failed to add member to group")
 			return err
 		}
 	}
@@ -542,18 +539,18 @@
 func CreateTrafficSchedWithRetry(OpenOltClient oop.OpenoltClient, sched *oop.TrafficSchedulers) error {
 	maxRetry := 20
 	if _, err := OpenOltClient.CreateTrafficSchedulers(context.Background(), sched); err == nil {
-		log.Info("succeeded in first attempt")
+		logger.Info(nil, "succeeded in first attempt")
 		return nil
 	} else {
-		log.Info("going for a retry")
+		logger.Info(nil, "going for a retry")
 	}
 	for i := 0; i < maxRetry; i++ {
 		if _, err := OpenOltClient.CreateTrafficSchedulers(context.Background(), sched); err != nil {
-			log.Error("retying after delay")
+			logger.Error(nil, "retying after delay")
 			time.Sleep(50 * time.Millisecond)
 			continue
 		} else {
-			log.Infow("succeeded in retry iteration=%d!!", log.Fields{"i": i})
+			logger.Infow(nil, "succeeded in retry iteration=%d!!", log.Fields{"i": i})
 			return nil
 		}
 	}
@@ -564,7 +561,7 @@
 func CreateTrafficQueuesWithRetry(OpenOltClient oop.OpenoltClient, queue *oop.TrafficQueues) error {
 	maxRetry := 20
 	if _, err := OpenOltClient.CreateTrafficQueues(context.Background(), queue); err == nil {
-		log.Info("succeeded in first attempt")
+		logger.Info(nil, "succeeded in first attempt")
 		return nil
 	}
 	for i := 0; i < maxRetry; i++ {
@@ -572,7 +569,7 @@
 			time.Sleep(50 * time.Millisecond)
 			continue
 		} else {
-			log.Infow("succeeded in retry iteration=%d!!", log.Fields{"i": i})
+			logger.Infow(nil, "succeeded in retry iteration=%d!!", log.Fields{"i": i})
 			return nil
 		}
 	}
@@ -589,7 +586,7 @@
 
 	st, _ := status.FromError(err)
 	if st.Code() == codes.AlreadyExists {
-		log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+		logger.Debugw(nil, "Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
 		return nil
 	}
 	if st.Code() == codes.ResourceExhausted {
@@ -597,17 +594,17 @@
 			_, err = OpenOltClient.FlowAdd(context.Background(), flow)
 			st, _ := status.FromError(err)
 			if st.Code() == codes.ResourceExhausted {
-				log.Error("flow-install-failed--retrying")
+				logger.Error(nil, "flow-install-failed--retrying")
 				continue
 			} else if st.Code() == codes.OK {
-				log.Infow("flow-install-succeeded-on-retry", log.Fields{"i": i, "flow": flow})
+				logger.Infow(nil, "flow-install-succeeded-on-retry", log.Fields{"i": i, "flow": flow})
 				return nil
 			}
 		}
 
 	}
 
-	log.Debugw("Flow install failed on all retries ", log.Fields{"flow": flow})
+	logger.Debugw(nil, "Flow install failed on all retries ", log.Fields{"flow": flow})
 
 	return err
 }