VOL-1624 Support for tech-profile creation on the first flow that references the tp-id (in write-metadata)

Getting meter from flow itself and bug fixes

Bug fix for dhcp packet-out

Change-Id: Ia466988bfdbfe49fd9a44729a4ba4a30fd991c54
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index 498f6e7..38d5c7e 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -23,6 +23,8 @@
 	"encoding/json"
 	"errors"
 	"fmt"
+	"math/big"
+
 	"github.com/opencord/voltha-go/common/log"
 	tp "github.com/opencord/voltha-go/common/techprofile"
 	"github.com/opencord/voltha-go/rw_core/utils"
@@ -30,8 +32,8 @@
 	ic "github.com/opencord/voltha-protos/go/inter_container"
 	ofp "github.com/opencord/voltha-protos/go/openflow_13"
 	openoltpb2 "github.com/opencord/voltha-protos/go/openolt"
+	tp_pb "github.com/opencord/voltha-protos/go/tech_profile"
 	"github.com/opencord/voltha-protos/go/voltha"
-	"math/big"
 	//deepcopy "github.com/getlantern/deepcopy"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -46,6 +48,9 @@
 	//EapolFlow flow category
 	EapolFlow = "EAPOL_FLOW"
 
+	//DhcpFlow flow category
+	DhcpFlow = "DHCP_FLOW"
+
 	//IPProtoDhcp flow category
 	IPProtoDhcp = 17
 
@@ -117,6 +122,14 @@
 	PushVlan = "push_vlan"
 	//TrapToHost constant
 	TrapToHost = "trap_to_host"
+	//MaxMeterBand constant
+	MaxMeterBand = 2
+	//VlanPCPMask contant
+	VlanPCPMask = 0xFF
+	//VlanvIDMask constant
+	VlanvIDMask = 0xFFF
+	//MaxPonPorts constant
+	MaxPonPorts = 16
 )
 
 type onuInfo struct {
@@ -159,6 +172,7 @@
 	var flowMgr OpenOltFlowMgr
 	flowMgr.deviceHandler = dh
 	flowMgr.resourceMgr = rsrcMgr
+	flowMgr.techprofile = make([]*tp.TechProfileMgr, MaxPonPorts)
 	if err := flowMgr.populateTechProfilePerPonPort(); err != nil {
 		log.Error("Error while populating tech profile mgr\n")
 		return nil
@@ -197,138 +211,337 @@
 	log.Debugw("updated Stored flow info", log.Fields{"storedDeviceFlows": f.storedDeviceFlows})
 }
 
-func (f *OpenOltFlowMgr) divideAndAddFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifierInfo map[string]interface{}, actionInfo map[string]interface{}, flow *ofp.OfpFlowStats) {
-	var allocID []uint32
+func (f *OpenOltFlowMgr) divideAndAddFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifierInfo map[string]interface{}, actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, TpId uint32, UsMeterId uint32, DsMeterId uint32, flowMetadata *voltha.FlowMetadata) {
+	var allocId []uint32
 	var gemPorts []uint32
-	var uni string
+	var gemPort uint32
+	var TpInst *tp.TechProfile
 
-	log.Infow("Dividing flow", log.Fields{"intfId": intfID, "onuId": onuID, "uniId": uniID, "portNo": portNo, "classifier": classifierInfo, "action": actionInfo})
-
-	log.Infow("sorting flow", log.Fields{"intfId": intfID, "onuId": onuID, "uniId": uniID, "portNo": portNo,
-		"classifierInfo": classifierInfo, "actionInfo": actionInfo})
-
+	log.Infow("Dividing flow", log.Fields{"intfId": intfID, "onuId": onuID, "uniId": uniID, "portNo": portNo,
+		"classifier": classifierInfo, "action": actionInfo, "UsMeterId": UsMeterId, "DsMeterId": DsMeterId, "TpId": TpId})
 	// only create tcont/gemports if there is actually an onu id.  otherwise BAL throws an error.  Usually this
 	// is because the flow is an NNI flow and there would be no onu resources associated with it
 	// TODO: properly deal with NNI flows
-	if onuID > 0 {
-		uni = getUniPortPath(intfID, onuID, uniID)
-		log.Debugw("Uni port name", log.Fields{"uni": uni})
-		allocID, gemPorts = f.createTcontGemports(intfID, onuID, uniID, uni, portNo, flow.GetTableId())
-		if allocID == nil || gemPorts == nil {
-			log.Error("alloc-id-gem-ports-unavailable")
-			return
-		}
-		log.Debugw("Generated required alloc and gemport ids", log.Fields{"alloc_id": allocID, "gemPorts": gemPorts})
-	} else {
+	if onuID <= 0 {
 		log.Errorw("No onu id for flow", log.Fields{"portNo": portNo, "classifer": classifierInfo, "action": actionInfo})
 		return
 	}
 
-	/* Flows can't be added specific to gemport unless p-bits are received.
-	 * Hence adding flows for all gemports
+	uni := getUniPortPath(intfID, onuID, uniID)
+	log.Debugw("Uni port name", log.Fields{"uni": uni})
+	allocId, gemPorts, TpInst = f.createTcontGemports(intfID, onuID, uniID, uni, portNo, TpId, UsMeterId, DsMeterId, flowMetadata)
+	if allocId == nil || gemPorts == nil || TpInst == nil {
+		log.Error("alloc-id-gem-ports-tp-unavailable")
+		return
+	}
+
+	/* Flows can be added specific to gemport if p-bits are received.
+	 * If no pbit mentioned then adding flows for all gemports
 	 */
-	for _, gemPort := range gemPorts {
-		if ipProto, ok := classifierInfo[IPProto]; ok {
-			if ipProto.(uint32) == IPProtoDhcp {
-				log.Info("Adding DHCP flow")
-				f.addDHCPTrapFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID[0], gemPort)
-			} else if ipProto.(uint32) == IPProtoIgmp {
-				log.Info("igmp flow add ignored, not implemented yet")
+
+	args := make(map[string]uint32)
+	args["intfId"] = intfID
+	args["onuId"] = onuID
+	args["uniId"] = uniID
+	args["portNo"] = portNo
+	args["allocId"] = allocId[0]
+
+	if ipProto, ok := classifierInfo[IPProto]; ok {
+		if ipProto.(uint32) == IPProtoDhcp {
+			log.Info("Adding DHCP flow")
+			if pcp, ok := classifierInfo[VlanPcp]; ok {
+				gemPort = f.techprofile[intfID].GetGemportIDForPbit(TpInst,
+					tp_pb.Direction_UPSTREAM,
+					pcp.(uint32))
+				//Adding DHCP upstream flow
+				f.addDHCPTrapFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocId[0], gemPort)
 			} else {
-				log.Errorw("Invalid-Classifier-to-handle", log.Fields{"classifier": classifierInfo, "action": actionInfo})
-				//return errors.New("Invalid-Classifier-to-handle")
+				//Adding DHCP upstream flow to all gemports
+				installFlowOnAllGemports(f.addDHCPTrapFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, DhcpFlow)
 			}
-		} else if ethType, ok := classifierInfo[EthType]; ok {
-			if ethType.(uint32) == EapEthType {
-				log.Info("Adding EAPOL flow")
-				f.addEAPOLFlow(intfID, onuID, uniID, portNo, flow, allocID[0], gemPort, DefaultMgmtVlan)
-				if vlan := getSubscriberVlan(utils.GetInPort(flow)); vlan != 0 {
-					f.addEAPOLFlow(intfID, onuID, uniID, portNo, flow, allocID[0], gemPort, vlan)
-				}
-				// Send Techprofile download event to child device in go routine as it takes time
-				go f.sendTPDownloadMsgToChild(intfID, onuID, uniID, uni)
-			}
-			if ethType == LldpEthType {
-				log.Info("Adding LLDP flow")
-				addLLDPFlow(flow, portNo)
-			}
-		} else if _, ok := actionInfo[PushVlan]; ok {
-			log.Info("Adding upstream data rule")
-			f.addUpstreamDataFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID[0], gemPort)
-		} else if _, ok := actionInfo[PopVlan]; ok {
-			log.Info("Adding Downstream data rule")
-			f.addDownstreamDataFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID[0], gemPort)
+
+		} else if ipProto == IgmpProto {
+			log.Info("igmp flow add ignored, not implemented yet")
 		} else {
-			log.Errorw("Invalid-flow-type-to-handle", log.Fields{"classifier": classifierInfo, "action": actionInfo, "flow": flow})
+			log.Errorw("Invalid-Classifier-to-handle", log.Fields{"classifier": classifierInfo, "action": actionInfo})
+			//return errors.New("Invalid-Classifier-to-handle")
 		}
+	} else if ethType, ok := classifierInfo[EthType]; ok {
+		if ethType.(uint32) == EapEthType {
+			log.Info("Adding EAPOL flow")
+			var vlanId uint32
+			if val, ok := classifierInfo[VlanVid]; ok {
+				vlanId = (val.(uint32)) & VlanvIDMask
+			} else {
+				vlanId = DefaultMgmtVlan
+			}
+			if pcp, ok := classifierInfo[VlanPcp]; ok {
+				gemPort = f.techprofile[intfID].GetGemportIDForPbit(TpInst,
+					tp_pb.Direction_UPSTREAM,
+					pcp.(uint32))
+
+				f.addEAPOLFlow(intfID, onuID, uniID, portNo, flow, allocId[0], gemPort, vlanId)
+			} else {
+				installFlowOnAllGemports(nil, f.addEAPOLFlow, args, classifierInfo, actionInfo, flow, gemPorts, EapolFlow, vlanId)
+			}
+			// Send Techprofile download event to child device in go routine as it takes time
+			go f.sendTPDownloadMsgToChild(intfID, onuID, uniID, uni, TpId)
+		}
+		if ethType == LldpEthType {
+			log.Info("Adding LLDP flow")
+			addLLDPFlow(flow, portNo)
+		}
+	} else if _, ok := actionInfo[PushVlan]; ok {
+		log.Info("Adding upstream data rule")
+		if pcp, ok := classifierInfo[VlanPcp]; ok {
+			gemPort = f.techprofile[intfID].GetGemportIDForPbit(TpInst,
+				tp_pb.Direction_UPSTREAM,
+				pcp.(uint32))
+			//Adding HSIA upstream flow
+			f.addUpstreamDataFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocId[0], gemPort)
+		} else {
+			//Adding HSIA upstream flow to all gemports
+			installFlowOnAllGemports(f.addUpstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, HsiaFlow)
+		}
+	} else if _, ok := actionInfo[PopVlan]; ok {
+		log.Info("Adding Downstream data rule")
+		if pcp, ok := classifierInfo[VlanPcp]; ok {
+			gemPort = f.techprofile[intfID].GetGemportIDForPbit(TpInst,
+				tp_pb.Direction_UPSTREAM,
+				pcp.(uint32))
+			//Adding HSIA downstream flow
+			f.addDownstreamDataFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocId[0], gemPort)
+		} else {
+			//Adding HSIA downstream flow to all gemports
+			installFlowOnAllGemports(f.addDownstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, HsiaFlow)
+		}
+	} else {
+		log.Errorw("Invalid-flow-type-to-handle", log.Fields{"classifier": classifierInfo, "action": actionInfo, "flow": flow})
 	}
 }
 
+func (f *OpenOltFlowMgr) CreateSchedulerQueues(Dir tp_pb.Direction, IntfId uint32, OnuId uint32, UniId uint32, UniPort uint32, TpInst *tp.TechProfile, MeterId uint32, flowMetadata *voltha.FlowMetadata) error {
+
+	log.Debugw("CreateSchedulerQueues", log.Fields{"Dir": Dir, "IntfId": IntfId, "OnuId": OnuId,
+		"UniId": UniId, "MeterId": MeterId, "TpInst": *TpInst, "flowMetadata": flowMetadata})
+
+	if MeterId == 0 { // This should never happen
+		log.Error("Invalid meter id")
+		return errors.New("Invalid meter id")
+	}
+
+	/* Lets make a simple assumption that if the meter-id is present on the KV store,
+	 * then the scheduler and queues configuration is applied on the OLT device
+	 * in the given direction.
+	 */
+	var Direction string
+	var SchedCfg *tp_pb.SchedulerConfig
+	if Dir == tp_pb.Direction_UPSTREAM {
+		Direction = "upstream"
+	} else if Dir == tp_pb.Direction_DOWNSTREAM {
+		Direction = "downstream"
+	}
+	KvStoreMeter, err := f.resourceMgr.GetMeterIdForOnu(Direction, IntfId, OnuId, UniId)
+	if err != nil {
+		log.Error("Failed to get meter for intf %d, onuid %d, uniid %d", IntfId, OnuId, UniId)
+		return err
+	}
+	if KvStoreMeter != nil {
+		if KvStoreMeter.MeterId == MeterId {
+			log.Debug("Scheduler already created for upstream")
+			return nil
+		} else {
+			log.Errorw("Dynamic meter update not supported", log.Fields{"KvStoreMeterId": KvStoreMeter.MeterId, "MeterId-in-flow": MeterId})
+			return errors.New("Invalid-meter-id-in-flow")
+		}
+	}
+	log.Debugw("Meter-does-not-exist-Creating-new", log.Fields{"MeterId": MeterId, "Direction": Direction})
+	if Dir == tp_pb.Direction_UPSTREAM {
+		SchedCfg = f.techprofile[IntfId].GetUsScheduler(TpInst)
+	} else if Dir == tp_pb.Direction_DOWNSTREAM {
+		SchedCfg = f.techprofile[IntfId].GetDsScheduler(TpInst)
+	}
+	var meterConfig *ofp.OfpMeterConfig
+	if flowMetadata != nil {
+		for _, meter := range flowMetadata.Meters {
+			if MeterId == meter.MeterId {
+				meterConfig = meter
+				log.Debugw("Found-meter-config-from-flowmetadata", log.Fields{"meterConfig": meterConfig})
+				break
+			}
+		}
+	} else {
+		log.Error("Flow-metadata-is-not-present-in-flow")
+	}
+	if meterConfig == nil {
+		log.Errorw("Could-not-get-meterbands-from-flowMetadata", log.Fields{"flowMetadata": flowMetadata, "MeterId": MeterId})
+		return errors.New("Failed-to-get-meter-from-flowMetadata")
+	} else if len(meterConfig.Bands) < MaxMeterBand {
+		log.Errorw("Invalid-number-of-bands-in-meter", log.Fields{"Bands": meterConfig.Bands, "MeterId": MeterId})
+		return errors.New("Invalid-number-of-bands-in-meter")
+	}
+	cir := meterConfig.Bands[0].Rate
+	cbs := meterConfig.Bands[0].BurstSize
+	eir := meterConfig.Bands[1].Rate
+	ebs := meterConfig.Bands[1].BurstSize
+	pir := cir + eir
+	pbs := cbs + ebs
+	TrafficShaping := &tp_pb.TrafficShapingInfo{Cir: cir, Cbs: cbs, Pir: pir, Pbs: pbs}
+
+	TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[IntfId].GetTrafficScheduler(TpInst, SchedCfg, TrafficShaping)}
+
+	log.Debugw("Sending Traffic scheduler create to device", log.Fields{"Direction": Direction, "TrafficScheds": TrafficSched})
+	if _, err := f.deviceHandler.Client.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
+		IntfId: IntfId, OnuId: OnuId,
+		UniId: UniId, PortNo: UniPort,
+		TrafficScheds: TrafficSched}); err != nil {
+		log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
+		return err
+	}
+	// On receiving the CreateTrafficQueues request, the driver should create corresponding
+	// downstream queues.
+	trafficQueues := f.techprofile[IntfId].GetTrafficQueues(TpInst, Dir)
+	log.Debugw("Sending Traffic Queues create to device", log.Fields{"Direction": Direction, "TrafficQueues": trafficQueues})
+	if _, err := f.deviceHandler.Client.CreateTrafficQueues(context.Background(),
+		&tp_pb.TrafficQueues{IntfId: IntfId, OnuId: OnuId,
+			UniId: UniId, PortNo: UniPort,
+			TrafficQueues: trafficQueues}); err != nil {
+		log.Errorw("Failed to create traffic queues in device", log.Fields{"error": err})
+		return err
+	}
+
+	/* After we succesfully applied the scheduler configuration on the OLT device,
+	 * store the meter id on the KV store, for further reference.
+	 */
+	if err := f.resourceMgr.UpdateMeterIdForOnu(Direction, IntfId, OnuId, UniId, meterConfig); err != nil {
+		log.Error("Failed to update meter id for onu %d, meterid %d", OnuId, MeterId)
+		return err
+	}
+	log.Debugw("updated-meter-info into KV store successfully", log.Fields{"Direction": Direction,
+		"Meter": meterConfig})
+	return nil
+}
+
+func (f *OpenOltFlowMgr) RemoveSchedulerQueues(Dir tp_pb.Direction, IntfId uint32, OnuId uint32, UniId uint32, UniPort uint32, TpInst *tp.TechProfile) error {
+
+	var Direction string
+	var SchedCfg *tp_pb.SchedulerConfig
+	var err error
+	log.Debugw("Removing schedulers and Queues in OLT", log.Fields{"Direction": Dir, "IntfId": IntfId, "OnuId": OnuId, "UniId": UniId, "UniPort": UniPort})
+	if Dir == tp_pb.Direction_UPSTREAM {
+		SchedCfg = f.techprofile[IntfId].GetUsScheduler(TpInst)
+		Direction = "upstream"
+	} else if Dir == tp_pb.Direction_DOWNSTREAM {
+		SchedCfg = f.techprofile[IntfId].GetDsScheduler(TpInst)
+		Direction = "downstream"
+	}
+
+	KVStoreMeter, err := f.resourceMgr.GetMeterIdForOnu(Direction, IntfId, OnuId, UniId)
+	if err != nil {
+		log.Errorf("Failed to get Meter for Onu %d", OnuId)
+		return err
+	}
+	if KVStoreMeter == nil {
+		log.Debugw("No-meter-has-been-installed-yet", log.Fields{"direction": Direction, "IntfId": IntfId, "OnuId": OnuId, "UniId": UniId})
+		return nil
+	}
+	cir := KVStoreMeter.Bands[0].Rate
+	cbs := KVStoreMeter.Bands[0].BurstSize
+	eir := KVStoreMeter.Bands[1].Rate
+	ebs := KVStoreMeter.Bands[1].BurstSize
+	pir := cir + eir
+	pbs := cbs + ebs
+
+	TrafficShaping := &tp_pb.TrafficShapingInfo{Cir: cir, Cbs: cbs, Pir: pir, Pbs: pbs}
+
+	TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[IntfId].GetTrafficScheduler(TpInst, SchedCfg, TrafficShaping)}
+	TrafficQueues := f.techprofile[IntfId].GetTrafficQueues(TpInst, Dir)
+
+	if _, err = f.deviceHandler.Client.RemoveTrafficQueues(context.Background(),
+		&tp_pb.TrafficQueues{IntfId: IntfId, OnuId: OnuId,
+			UniId: UniId, PortNo: UniPort,
+			TrafficQueues: TrafficQueues}); err != nil {
+		log.Error("Failed to remove traffic queues")
+		return err
+	} else {
+		log.Debug("Removed traffic queues successfully")
+	}
+	if _, err = f.deviceHandler.Client.RemoveTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
+		IntfId: IntfId, OnuId: OnuId,
+		UniId: UniId, PortNo: UniPort,
+		TrafficScheds: TrafficSched}); err != nil {
+		log.Error("failed to remove traffic schedulers")
+		return err
+	} else {
+		log.Debug("Removed traffic schedulers successfully")
+	}
+
+	/* After we succesfully remove the scheduler configuration on the OLT device,
+	 * delete the meter id on the KV store.
+	 */
+	err = f.resourceMgr.RemoveMeterIdForOnu(Direction, IntfId, OnuId, UniId)
+	if err != nil {
+		log.Errorf("Failed to remove meter for onu %d, meter id %d", OnuId, KVStoreMeter.MeterId)
+	}
+	log.Debugw("Removed-meter-from-KV-store successfully", log.Fields{"MeterId": KVStoreMeter.MeterId, "dir": Direction})
+	return err
+}
+
 // This function allocates tconts and GEM ports for an ONU, currently one TCONT is supported per ONU
-func (f *OpenOltFlowMgr) createTcontGemports(intfID uint32, onuID uint32, uniID uint32, uni string, uniPort uint32, tableID uint32) ([]uint32, []uint32) {
+func (f *OpenOltFlowMgr) createTcontGemports(intfID uint32, onuID uint32, uniID uint32, uni string, uniPort uint32, TpID uint32, UsMeterID uint32, DsMeterID uint32, flowMetadata *voltha.FlowMetadata) ([]uint32, []uint32, *tp.TechProfile) {
 	var allocID []uint32
 	var gemPortIDs []uint32
 	//If we already have allocated earlier for this onu, render them
-	if tcontID := f.resourceMgr.GetCurrentAllocIDForOnu(intfID, onuID, uniID); tcontID != 0 {
-		allocID = append(allocID, tcontID)
+	if tcontId := f.resourceMgr.GetCurrentAllocIDForOnu(intfID, onuID, uniID); tcontId != 0 {
+		allocID = append(allocID, tcontId)
 	}
 	gemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(intfID, onuID, uniID)
-	if len(allocID) != 0 && len(gemPortIDs) != 0 {
-		log.Debug("Rendered Tcont and GEM ports from resource manager", log.Fields{"intfId": intfID, "onuId": onuID, "uniPort": uniID,
-			"allocID": allocID, "gemPortIDs": gemPortIDs})
-		return allocID, gemPortIDs
+
+	tpPath := f.getTPpath(intfID, uni, TpID)
+	// Check tech profile instance already exists for derived port name
+	tech_profile_instance, err := f.techprofile[intfID].GetTPInstanceFromKVStore(TpID, tpPath)
+	if err != nil { // This should not happen, something wrong in KV backend transaction
+		log.Errorw("Error in fetching tech profile instance from KV store", log.Fields{"tpID": TpID, "path": tpPath})
+		return nil, nil, nil
 	}
+
 	log.Debug("Creating New TConts and Gem ports", log.Fields{"pon": intfID, "onu": onuID, "uni": uniID})
 
-	//FIXME: If table id is <= 63 using 64 as table id
-	if tableID < tp.DEFAULT_TECH_PROFILE_TABLE_ID {
-		tableID = tp.DEFAULT_TECH_PROFILE_TABLE_ID
-	}
-	tpPath := f.getTPpath(intfID, uni)
-	// Check tech profile instance already exists for derived port name
-	techProfileInstance, err := f.techprofile[intfID].GetTPInstanceFromKVStore(tableID, tpPath)
-	if err != nil { // This should not happen, something wrong in KV backend transaction
-		log.Errorw("Error in fetching tech profile instance from KV store", log.Fields{"tableID": tableID, "path": tpPath})
-		return nil, nil
-	}
-	if techProfileInstance == nil {
+	if tech_profile_instance == nil {
 		log.Info("Creating tech profile instance", log.Fields{"path": tpPath})
-		techProfileInstance = f.techprofile[intfID].CreateTechProfInstance(tableID, uni, intfID)
-		if techProfileInstance == nil {
+		tech_profile_instance = f.techprofile[intfID].CreateTechProfInstance(TpID, uni, intfID)
+		if tech_profile_instance == nil {
 			log.Error("Tech-profile-instance-creation-failed")
-			return nil, nil
+			return nil, nil, nil
 		}
+		f.resourceMgr.UpdateTechProfileIdForOnu(intfID, onuID, uniID, TpID)
 	} else {
 		log.Debugw("Tech-profile-instance-already-exist-for-given port-name", log.Fields{"uni": uni})
 	}
-	// Get upstream and downstream scheduler protos
-	usScheduler := f.techprofile[intfID].GetUsScheduler(techProfileInstance)
-	dsScheduler := f.techprofile[intfID].GetDsScheduler(techProfileInstance)
-	// Get TCONTS protos
-	tconts := f.techprofile[intfID].GetTconts(techProfileInstance, usScheduler, dsScheduler)
-	if len(tconts) == 0 {
-		log.Error("TCONTS not found ")
-		return nil, nil
+	if UsMeterID != 0 {
+		if err := f.CreateSchedulerQueues(tp_pb.Direction_UPSTREAM, intfID, onuID, uniID, uniPort, tech_profile_instance, UsMeterID, flowMetadata); err != nil {
+			log.Errorw("CreateSchedulerQueues Failed-upstream", log.Fields{"error": err, "meterID": UsMeterID})
+			return nil, nil, nil
+		}
 	}
-	log.Debugw("Sending Create tcont to device",
-		log.Fields{"onu": onuID, "uni": uniID, "portNo": "", "tconts": tconts})
-	if _, err := f.deviceHandler.Client.CreateTconts(context.Background(),
-		&openoltpb2.Tconts{IntfId: intfID,
-			OnuId:  onuID,
-			UniId:  uniID,
-			PortNo: uniPort,
-			Tconts: tconts}); err != nil {
-		log.Errorw("Error while creating TCONT in device", log.Fields{"error": err})
-		return nil, nil
+	if DsMeterID != 0 {
+		if err := f.CreateSchedulerQueues(tp_pb.Direction_DOWNSTREAM, intfID, onuID, uniID, uniPort, tech_profile_instance, DsMeterID, flowMetadata); err != nil {
+			log.Errorw("CreateSchedulerQueues Failed-downstream", log.Fields{"error": err, "meterID": DsMeterID})
+			return nil, nil, nil
+		}
 	}
-	allocID = append(allocID, techProfileInstance.UsScheduler.AllocID)
-	for _, gem := range techProfileInstance.UpstreamGemPortAttributeList {
-		gemPortIDs = append(gemPortIDs, gem.GemportID)
+	if len(allocID) == 0 { // Created TCONT first time
+		allocID = append(allocID, tech_profile_instance.UsScheduler.AllocID)
+	}
+	if len(gemPortIDs) == 0 { // Create GEM ports first time
+		for _, gem := range tech_profile_instance.UpstreamGemPortAttributeList {
+			gemPortIDs = append(gemPortIDs, gem.GemportID)
+		}
 	}
 	log.Debugw("Allocated Tcont and GEM ports", log.Fields{"allocID": allocID, "gemports": gemPortIDs})
 	// Send Tconts and GEM ports to KV store
 	f.storeTcontsGEMPortsIntoKVStore(intfID, onuID, uniID, allocID, gemPortIDs)
-	return allocID, gemPortIDs
+	return allocID, gemPortIDs, tech_profile_instance
 }
 
 func (f *OpenOltFlowMgr) storeTcontsGEMPortsIntoKVStore(intfID uint32, onuID uint32, uniID uint32, allocID []uint32, gemPortIDs []uint32) {
@@ -352,19 +565,22 @@
 }
 
 func (f *OpenOltFlowMgr) populateTechProfilePerPonPort() error {
+	var tpCount int = 0
 	for _, techRange := range f.resourceMgr.DevInfo.Ranges {
-		for intfID := range techRange.IntfIds {
-			f.techprofile = append(f.techprofile, f.resourceMgr.ResourceMgrs[uint32(intfID)].TechProfileMgr)
+		for _, intfId := range techRange.IntfIds {
+			f.techprofile[intfId] = f.resourceMgr.ResourceMgrs[uint32(intfId)].TechProfileMgr
+			tpCount++
+			log.Debugw("Init tech profile done", log.Fields{"intfId": intfId})
 		}
 	}
 	//Make sure we have as many tech_profiles as there are pon ports on the device
-	if len(f.techprofile) != int(f.resourceMgr.DevInfo.GetPonPorts()) {
+	if tpCount != int(f.resourceMgr.DevInfo.GetPonPorts()) {
 		log.Errorw("Error while populating techprofile",
-			log.Fields{"numofTech": len(f.techprofile), "numPonPorts": f.resourceMgr.DevInfo.GetPonPorts()})
-		return errors.New("error while populating techprofile mgrs")
+			log.Fields{"numofTech": tpCount, "numPonPorts": f.resourceMgr.DevInfo.GetPonPorts()})
+		return errors.New("Error while populating techprofile mgrs")
 	}
-	log.Infow("Populated techprofile per ponport successfully",
-		log.Fields{"numofTech": len(f.techprofile), "numPonPorts": f.resourceMgr.DevInfo.GetPonPorts()})
+	log.Infow("Populated techprofile for ponports successfully",
+		log.Fields{"numofTech": tpCount, "numPonPorts": f.resourceMgr.DevInfo.GetPonPorts()})
 	return nil
 }
 
@@ -386,13 +602,18 @@
 	downlinkClassifier[PacketTagType] = DoubleTag
 	log.Debugw("Adding downstream data flow", log.Fields{"downlinkClassifier": downlinkClassifier,
 		"downlinkAction": downlinkAction})
-	// Ignore private VLAN flow given by decomposer, cannot do anything with this flow
-	if uint32(downlinkClassifier[METADATA].(uint64)) == MkUniPortNum(intfID, onuID, uniID) &&
-		downlinkClassifier[VlanVid] == (uint32(ofp.OfpVlanId_OFPVID_PRESENT)|4000) {
-		log.Infow("EAPOL DL flow , Already added ,ignoring it", log.Fields{"downlinkClassifier": downlinkClassifier,
-			"downlinkAction": downlinkAction})
-		return
+	// Ignore Downlink trap flow given by core, cannot do anything with this flow */
+	if vlan, exists := downlinkClassifier[VlanVid]; exists {
+		if vlan.(uint32) == (uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000) { //private VLAN given by core
+			if metadata, exists := downlinkClassifier[METADATA]; exists { // inport is filled in metadata by core
+				if uint32(metadata.(uint64)) == MkUniPortNum(intfID, onuID, uniID) {
+					log.Infow("Ignoring DL trap device flow from core", log.Fields{"flow": logicalFlow})
+					return
+				}
+			}
+		}
 	}
+
 	/* Already this info available classifier? */
 	downlinkAction[PopVlan] = true
 	downlinkAction[VlanVid] = downlinkClassifier[VlanVid]
@@ -413,9 +634,13 @@
 	log.Debugw("Adding HSIA flow", log.Fields{"intfId": intfID, "onuId": onuID, "uniId": uniID, "classifier": classifier,
 		"action": action, "direction": direction, "allocId": allocID, "gemPortId": gemPortID,
 		"logicalFlow": *logicalFlow})
-	flowCategory := "HSIA"
+	var vlan_pit uint32 = 0
+	if _, ok := classifier[VlanPcp]; ok {
+		vlan_pit = classifier[VlanPcp].(uint32)
+		log.Debugw("Found pbit in the flow", log.Fields{"vlan_pit": vlan_pit})
+	}
 	flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
-	flowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, flowStoreCookie, flowCategory)
+	flowId, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, HsiaFlow, vlan_pit)
 	if err != nil {
 		log.Errorw("Flow id unavailable for HSIA flow", log.Fields{"direction": direction})
 		return
@@ -436,7 +661,7 @@
 	flow := openoltpb2.Flow{AccessIntfId: int32(intfID),
 		OnuId:         int32(onuID),
 		UniId:         int32(uniID),
-		FlowId:        flowID,
+		FlowId:        flowId,
 		FlowType:      direction,
 		AllocId:       int32(allocID),
 		NetworkIntfId: int32(networkIntfID),
@@ -448,7 +673,7 @@
 		PortNo:        portNo}
 	if ok := f.addFlowToDevice(logicalFlow, &flow); ok {
 		log.Debug("HSIA flow added to device successfully", log.Fields{"direction": direction})
-		flowsToKVStore := f.getUpdatedFlowInfo(&flow, flowStoreCookie, "HSIA", flowID)
+		flowsToKVStore := f.getUpdatedFlowInfo(&flow, flowStoreCookie, HsiaFlow, flowId)
 		if err := f.updateFlowInfoToKVStore(flow.AccessIntfId,
 			flow.OnuId,
 			flow.UniId,
@@ -477,7 +702,7 @@
 
 	flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
 
-	flowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, flowStoreCookie, "")
+	flowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, "", 0 /*classifier[VLAN_PCP].(uint32)*/)
 
 	if err != nil {
 		log.Errorw("flowId unavailable for UL EAPOL", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
@@ -545,7 +770,7 @@
 	uplinkAction[TrapToHost] = true
 	flowStoreCookie := getFlowStoreCookie(uplinkClassifier, gemPortID)
 	//Add Uplink EAPOL Flow
-	uplinkFlowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, flowStoreCookie, "")
+	uplinkFlowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, "", 0)
 	if err != nil {
 		log.Errorw("flowId unavailable for UL EAPOL", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
 		return
@@ -592,8 +817,8 @@
 			return
 		}
 	}
-
-	if vlanID == DefaultMgmtVlan {
+	// Dummy Downstream flow due to BAL 2.6 limitation
+	{
 		/* Add Downstream EAPOL Flow, Only for first EAP flow (BAL
 		# requirement)
 		# On one of the platforms (Broadcom BAL), when same DL classifier
@@ -613,19 +838,20 @@
 		log.Debugw("specialVlanEAPOLDlFlow:", log.Fields{"dl_vlan": specialVlanDlFlow})
 		// Fill Classfier
 		downlinkClassifier[PacketTagType] = SingleTag
+		downlinkClassifier[EthType] = uint32(EapEthType)
 		downlinkClassifier[VlanVid] = uint32(specialVlanDlFlow)
 		// Fill action
 		downlinkAction[PushVlan] = true
 		downlinkAction[VlanVid] = vlanID
 		flowStoreCookie := getFlowStoreCookie(downlinkClassifier, gemPortID)
-		downlinkFlowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, flowStoreCookie, "")
+		downlinkFlowId, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, "", 0)
 		if err != nil {
 			log.Errorw("flowId unavailable for DL EAPOL",
 				log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
 			return
 		}
 		log.Debugw("Creating DL EAPOL flow",
-			log.Fields{"dl_classifier": downlinkClassifier, "dl_action": downlinkAction, "downlinkFlowId": downlinkFlowID})
+			log.Fields{"dl_classifier": downlinkClassifier, "dl_action": downlinkAction, "downlinkFlowId": downlinkFlowId})
 		if classifierProto = makeOpenOltClassifierField(downlinkClassifier); classifierProto == nil {
 			log.Error("Error in making classifier protobuf for downlink flow")
 			return
@@ -638,7 +864,7 @@
 		downstreamFlow = openoltpb2.Flow{AccessIntfId: int32(intfID),
 			OnuId:         int32(onuID),
 			UniId:         int32(uniID),
-			FlowId:        downlinkFlowID,
+			FlowId:        downlinkFlowId,
 			FlowType:      DOWNSTREAM,
 			AllocId:       int32(allocID),
 			NetworkIntfId: int32(networkIntfID),
@@ -651,7 +877,7 @@
 		if ok := f.addFlowToDevice(logicalFlow, &downstreamFlow); ok {
 			log.Debug("EAPOL DL flow added to device successfully")
 			flowCategory := ""
-			flowsToKVStore := f.getUpdatedFlowInfo(&downstreamFlow, flowStoreCookie, flowCategory, downlinkFlowID)
+			flowsToKVStore := f.getUpdatedFlowInfo(&downstreamFlow, flowStoreCookie, flowCategory, downlinkFlowId)
 			if err := f.updateFlowInfoToKVStore(downstreamFlow.AccessIntfId,
 				downstreamFlow.OnuId,
 				downstreamFlow.UniId,
@@ -662,9 +888,6 @@
 				return
 			}
 		}
-	} else {
-		log.Infow("EAPOL flow with non-default mgmt vlan is not supported", log.Fields{"vlanId": vlanID})
-		return
 	}
 	log.Debugw("Added EAPOL flows to device successfully", log.Fields{"flow": logicalFlow})
 }
@@ -678,13 +901,17 @@
 		classifier.IpProto = ipProto.(uint32)
 	}
 	if vlanID, ok := classifierInfo[VlanVid]; ok {
-		classifier.OVid = (vlanID.(uint32)) & 0xFFF
+		classifier.OVid = (vlanID.(uint32)) & VlanvIDMask
 	}
-	if metadata, ok := classifierInfo[METADATA]; ok { // TODO: Revisit
+	if metadata, ok := classifierInfo[METADATA]; ok {
 		classifier.IVid = uint32(metadata.(uint64))
 	}
 	if vlanPcp, ok := classifierInfo[VlanPcp]; ok {
-		classifier.OPbits = vlanPcp.(uint32)
+		if vlanPcp == 0 {
+			classifier.OPbits = VlanPCPMask
+		} else {
+			classifier.OPbits = (vlanPcp.(uint32)) & VlanPCPMask
+		}
 	}
 	if udpSrc, ok := classifierInfo[UDPSrc]; ok {
 		classifier.SrcPort = udpSrc.(uint32)
@@ -732,14 +959,8 @@
 	return &action
 }
 
-func (f *OpenOltFlowMgr) getTPpath(intfID uint32, uni string) string {
-	/*
-	   FIXME
-	   Should get Table id form the flow, as of now hardcoded to DEFAULT_TECH_PROFILE_TABLE_ID (64)
-	   'tp_path' contains the suffix part of the tech_profile_instance path. The prefix to the 'tp_path' should be set to
-	   TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX by the ONU adapter.
-	*/
-	return f.techprofile[intfID].GetTechProfileInstanceKVPath(tp.DEFAULT_TECH_PROFILE_TABLE_ID, uni)
+func (f *OpenOltFlowMgr) getTPpath(intfID uint32, uni string, TpID uint32) string {
+	return f.techprofile[intfID].GetTechProfileInstanceKVPath(TpID, uni)
 }
 
 func getFlowStoreCookie(classifier map[string]interface{}, gemPortID uint32) uint64 {
@@ -832,6 +1053,7 @@
 	if deviceFlow.AccessIntfId != -1 {
 		intfID = uint32(deviceFlow.AccessIntfId)
 	} else {
+		// REVIST : Why ponport is given as network port?
 		intfID = uint32(deviceFlow.NetworkIntfId)
 	}
 
@@ -911,12 +1133,6 @@
 	return nil
 }
 
-func getSubscriberVlan(inPort uint32) uint32 {
-	/* For EAPOL case we will use default VLAN , so will implement later if required */
-	log.Info("unimplemented inport %v", inPort)
-	return 0
-}
-
 func (f *OpenOltFlowMgr) clearFlowsAndSchedulerForLogicalPort(childDevice *voltha.Device, logicalPort *voltha.LogicalPort) {
 	log.Info("unimplemented device %v, logicalport %v", childDevice, logicalPort)
 }
@@ -930,7 +1146,7 @@
 
 func (f *OpenOltFlowMgr) clearFlowFromResourceManager(flow *ofp.OfpFlowStats, flowID uint32, flowDirection string) {
 	log.Debugw("clearFlowFromResourceManager", log.Fields{"flowID": flowID, "flowDirection": flowDirection, "flow": *flow})
-	ponIntf, onuID, uniID, err := FlowExtractInfo(flow, flowDirection)
+	portNum, ponIntf, onuID, uniID, err := FlowExtractInfo(flow, flowDirection)
 	if err != nil {
 		log.Error(err)
 		return
@@ -965,13 +1181,36 @@
 		// For ex: Case of HSIA where same flow is shared
 		// between DS and US.
 		f.updateFlowInfoToKVStore(int32(ponIntf), int32(onuID), int32(uniID), flowID, &updatedFlows)
-		return
+		if len(updatedFlows) == 0 {
+			log.Debugw("Releasing flow Id to resource manager", log.Fields{"ponIntf": ponIntf, "onuId": onuID, "uniId": uniID, "flowId": flowID})
+			f.resourceMgr.FreeFlowID(ponIntf, onuID, uniID, flowID)
+		}
 	}
-	log.Debugw("Releasing flow Id to resource manager", log.Fields{"ponIntf": ponIntf, "onuID": onuID, "uniID": uniID, "flowID": flowID})
-	f.resourceMgr.FreeFlowID(ponIntf, onuID, uniID, flowID)
 	flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ponIntf, onuID, uniID)
 	if len(flowIds) == 0 {
-		/* TODO: Remove Upstream and Downstream Schedulers */
+		log.Debugf("Flow count for subscriber %d is zero", onuID)
+		kvstoreTpId := f.resourceMgr.GetTechProfileIdForOnu(ponIntf, onuID, uniID)
+		if kvstoreTpId == 0 {
+			log.Warnw("Could-not-find-techprofile-tableid-for-uni", log.Fields{"ponIntf": ponIntf, "onuId": onuID, "uniId": uniID})
+			return
+		}
+		uni := getUniPortPath(ponIntf, onuID, uniID)
+		tpPath := f.getTPpath(ponIntf, uni, kvstoreTpId)
+		log.Debugw("Getting-techprofile-instance-for-subscriber", log.Fields{"TP-PATH": tpPath})
+		techprofileInst, err := f.techprofile[ponIntf].GetTPInstanceFromKVStore(kvstoreTpId, tpPath)
+		if err != nil { // This should not happen, something wrong in KV backend transaction
+			log.Errorw("Error in fetching tech profile instance from KV store", log.Fields{"tpID": 20, "path": tpPath})
+			return
+		}
+		if techprofileInst == nil {
+			log.Errorw("Tech-profile-instance-does-not-exist-in-KV Store", log.Fields{"tpPath": tpPath})
+			return
+		}
+
+		f.RemoveSchedulerQueues(tp_pb.Direction_UPSTREAM, ponIntf, onuID, uniID, portNum, techprofileInst)
+		f.RemoveSchedulerQueues(tp_pb.Direction_DOWNSTREAM, ponIntf, onuID, uniID, portNum, techprofileInst)
+	} else {
+		log.Debugf("Flow ids for subscriber", log.Fields{"onu": onuID, "flows": flowIds})
 	}
 }
 
@@ -1013,15 +1252,101 @@
 }
 
 // AddFlow add flow to device
-func (f *OpenOltFlowMgr) AddFlow(flow *ofp.OfpFlowStats) {
+func (f *OpenOltFlowMgr) AddFlow(flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) {
 	classifierInfo := make(map[string]interface{})
 	actionInfo := make(map[string]interface{})
-	log.Debug("Adding Flow", log.Fields{"flow": flow})
+	var UsMeterID uint32
+	var DsMeterID uint32
+
+	log.Debug("Adding Flow", log.Fields{"flow": flow, "flowMetadata": flowMetadata})
 	for _, field := range utils.GetOfbFields(flow) {
-		f.updateClassifierInfo(field, classifierInfo)
+		if field.Type == utils.ETH_TYPE {
+			classifierInfo[EthType] = field.GetEthType()
+			log.Debug("field-type-eth-type", log.Fields{"classifierInfo[ETH_TYPE]": classifierInfo[EthType].(uint32)})
+		} else if field.Type == utils.IP_PROTO {
+			classifierInfo[IPProto] = field.GetIpProto()
+			log.Debug("field-type-ip-proto", log.Fields{"classifierInfo[IP_PROTO]": classifierInfo[IPProto].(uint32)})
+		} else if field.Type == utils.IN_PORT {
+			classifierInfo[InPort] = field.GetPort()
+			log.Debug("field-type-in-port", log.Fields{"classifierInfo[IN_PORT]": classifierInfo[InPort].(uint32)})
+		} else if field.Type == utils.VLAN_VID {
+			classifierInfo[VlanVid] = field.GetVlanVid()
+			log.Debug("field-type-vlan-vid", log.Fields{"classifierInfo[VLAN_VID]": classifierInfo[VlanVid].(uint32)})
+		} else if field.Type == utils.VLAN_PCP {
+			classifierInfo[VlanPcp] = field.GetVlanPcp()
+			log.Debug("field-type-vlan-pcp", log.Fields{"classifierInfo[VLAN_PCP]": classifierInfo[VlanPcp].(uint32)})
+		} else if field.Type == utils.UDP_DST {
+			classifierInfo[UDPDst] = field.GetUdpDst()
+			log.Debug("field-type-udp-dst", log.Fields{"classifierInfo[UDP_DST]": classifierInfo[UDPDst].(uint32)})
+		} else if field.Type == utils.UDP_SRC {
+			classifierInfo[UDPSrc] = field.GetUdpSrc()
+			log.Debug("field-type-udp-src", log.Fields{"classifierInfo[UDP_SRC]": classifierInfo[UDPSrc].(uint32)})
+		} else if field.Type == utils.IPV4_DST {
+			classifierInfo[Ipv4Dst] = field.GetIpv4Dst()
+			log.Debug("field-type-ipv4-dst", log.Fields{"classifierInfo[IPV4_DST]": classifierInfo[Ipv4Dst].(uint32)})
+		} else if field.Type == utils.IPV4_SRC {
+			classifierInfo[Ipv4Src] = field.GetIpv4Src()
+			log.Debug("field-type-ipv4-src", log.Fields{"classifierInfo[IPV4_SRC]": classifierInfo[Ipv4Src].(uint32)})
+		} else if field.Type == utils.METADATA {
+			classifierInfo[METADATA] = field.GetTableMetadata()
+			log.Debug("field-type-metadata", log.Fields{"classifierInfo[METADATA]": classifierInfo[METADATA].(uint64)})
+		} else if field.Type == utils.TUNNEL_ID {
+			classifierInfo[TunnelID] = field.GetTunnelId()
+			log.Debug("field-type-tunnelId", log.Fields{"classifierInfo[TUNNEL_ID]": classifierInfo[TunnelID].(uint64)})
+		} else {
+			log.Errorw("Un supported field type", log.Fields{"type": field.Type})
+			return
+		}
 	}
 	for _, action := range utils.GetActions(flow) {
-		f.updateFlowActionInfo(action, actionInfo, classifierInfo)
+		if action.Type == utils.OUTPUT {
+			if out := action.GetOutput(); out != nil {
+				actionInfo[OUTPUT] = out.GetPort()
+				log.Debugw("action-type-output", log.Fields{"out_port": actionInfo[OUTPUT].(uint32)})
+			} else {
+				log.Error("Invalid output port in action")
+				return
+			}
+		} else if action.Type == utils.POP_VLAN {
+			actionInfo[PopVlan] = true
+			log.Debugw("action-type-pop-vlan", log.Fields{"in_port": classifierInfo[InPort].(uint32)})
+		} else if action.Type == utils.PUSH_VLAN {
+			if out := action.GetPush(); out != nil {
+				if tpid := out.GetEthertype(); tpid != 0x8100 {
+					log.Errorw("Invalid ethertype in push action", log.Fields{"ethertype": actionInfo[PushVlan].(int32)})
+				} else {
+					actionInfo[PushVlan] = true
+					actionInfo[TPID] = tpid
+					log.Debugw("action-type-push-vlan",
+						log.Fields{"push_tpid": actionInfo[TPID].(uint32), "in_port": classifierInfo[InPort].(uint32)})
+				}
+			}
+		} else if action.Type == utils.SET_FIELD {
+			if out := action.GetSetField(); out != nil {
+				if field := out.GetField(); field != nil {
+					if ofClass := field.GetOxmClass(); ofClass != ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
+						log.Errorw("Invalid openflow class", log.Fields{"class": ofClass})
+						return
+					}
+					/*log.Debugw("action-type-set-field",log.Fields{"field": field, "in_port": classifierInfo[IN_PORT].(uint32)})*/
+					if ofbField := field.GetOfbField(); ofbField != nil {
+						if fieldtype := ofbField.GetType(); fieldtype == ofp.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID {
+							if vlan := ofbField.GetVlanVid(); vlan != 0 {
+								actionInfo[VlanVid] = vlan & 0xfff
+								log.Debugw("action-set-vlan-vid", log.Fields{"actionInfo[VLAN_VID]": actionInfo[VlanVid].(uint32)})
+							} else {
+								log.Error("No Invalid vlan id in set vlan-vid action")
+							}
+						} else {
+							log.Errorw("unsupported-action-set-field-type", log.Fields{"type": fieldtype})
+						}
+					}
+				}
+			}
+		} else {
+			log.Errorw("Un supported action type", log.Fields{"type": action.Type})
+			return
+		}
 	}
 	/* Controller bound trap flows */
 	if isControllerFlow := IsControllerBoundFlow(actionInfo[OUTPUT].(uint32)); isControllerFlow {
@@ -1035,7 +1360,10 @@
 				log.Error("upstream pon-to-controller-flow, NO-inport-in-tunnelid")
 				return
 			}
-		}
+		} /*else {
+			log.Debugw("Trap on NNI flow currently not supported", log.Fields{"flow": *flow})
+			return
+		}*/
 	} else {
 		log.Debug("Non-Controller flows, getting uniport from tunnelid")
 		// Downstream flow from NNI to PON port , Use tunnel ID as new OUT port / UNI port
@@ -1061,7 +1389,7 @@
 		}
 	}
 	log.Infow("Flow ports", log.Fields{"classifierInfo_inport": classifierInfo[InPort], "action_output": actionInfo[OUTPUT]})
-	portNo, intfID, onuID, uniID := ExtractAccessFromFlow(classifierInfo[InPort].(uint32), actionInfo[OUTPUT].(uint32))
+	portNo, intfId, onuId, uniId := ExtractAccessFromFlow(classifierInfo[InPort].(uint32), actionInfo[OUTPUT].(uint32))
 	if ipProto, ok := classifierInfo[IPProto]; ok {
 		if ipProto.(uint32) == IPProtoDhcp {
 			if udpSrc, ok := classifierInfo[UDPSrc]; ok {
@@ -1073,102 +1401,40 @@
 			}
 		}
 	}
-	f.divideAndAddFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow)
-}
-
-func (f *OpenOltFlowMgr) updateClassifierInfo(field *ofp.OfpOxmOfbField, classifierInfo map[string]interface{}) {
-	if field.Type == utils.ETH_TYPE {
-		classifierInfo[EthType] = field.GetEthType()
-		log.Debug("field-type-eth-type", log.Fields{"classifierInfo[ETH_TYPE]": classifierInfo[EthType].(uint32)})
-	} else if field.Type == utils.IP_PROTO {
-		classifierInfo[IPProto] = field.GetIpProto()
-		log.Debug("field-type-ip-proto", log.Fields{"classifierInfo[IP_PROTO]": classifierInfo[IPProto].(uint32)})
-	} else if field.Type == utils.IN_PORT {
-		classifierInfo[InPort] = field.GetPort()
-		log.Debug("field-type-in-port", log.Fields{"classifierInfo[IN_PORT]": classifierInfo[InPort].(uint32)})
-	} else if field.Type == utils.VLAN_VID {
-		classifierInfo[VlanVid] = field.GetVlanVid()
-		log.Debug("field-type-vlan-vid", log.Fields{"classifierInfo[VLAN_VID]": classifierInfo[VlanVid].(uint32)})
-	} else if field.Type == utils.VLAN_PCP {
-		classifierInfo[VlanPcp] = field.GetVlanPcp()
-		log.Debug("field-type-vlan-pcp", log.Fields{"classifierInfo[VLAN_PCP]": classifierInfo[VlanPcp].(uint32)})
-	} else if field.Type == utils.UDP_DST {
-		classifierInfo[UDPDst] = field.GetUdpDst()
-		log.Debug("field-type-udp-dst", log.Fields{"classifierInfo[UDP_DST]": classifierInfo[UDPDst].(uint32)})
-	} else if field.Type == utils.UDP_SRC {
-		classifierInfo[UDPSrc] = field.GetUdpSrc()
-		log.Debug("field-type-udp-src", log.Fields{"classifierInfo[UDP_SRC]": classifierInfo[UDPSrc].(uint32)})
-	} else if field.Type == utils.IPV4_DST {
-		classifierInfo[Ipv4Dst] = field.GetIpv4Dst()
-		log.Debug("field-type-ipv4-dst", log.Fields{"classifierInfo[IPV4_DST]": classifierInfo[Ipv4Dst].(uint32)})
-	} else if field.Type == utils.IPV4_SRC {
-		classifierInfo[Ipv4Src] = field.GetIpv4Src()
-		log.Debug("field-type-ipv4-src", log.Fields{"classifierInfo[IPV4_SRC]": classifierInfo[Ipv4Src].(uint32)})
-	} else if field.Type == utils.METADATA {
-		classifierInfo[METADATA] = field.GetTableMetadata()
-		log.Debug("field-type-metadata", log.Fields{"classifierInfo[METADATA]": classifierInfo[METADATA].(uint64)})
-	} else if field.Type == utils.TUNNEL_ID {
-		classifierInfo[TunnelID] = field.GetTunnelId()
-		log.Debug("field-type-tunnelId", log.Fields{"classifierInfo[TUNNEL_ID]": classifierInfo[TunnelID].(uint64)})
-	} else {
-		log.Errorw("Un supported field type", log.Fields{"type": field.Type})
+	/* Metadata 8 bytes:
+	    Most Significant 2 Bytes = Inner VLAN
+	    Next 2 Bytes = Tech Profile ID(TPID)
+	    Least Significant 4 Bytes = Port ID
+	   Flow METADATA carries Tech-Profile (TP) ID and is mandatory in all
+	   subscriber related flows.
+	*/
+	metadata := utils.GetMetadataFromWriteMetadataAction(flow)
+	if metadata == 0 {
+		log.Error("Metadata is not present in flow which is mandatory")
 		return
 	}
-}
-
-func (f *OpenOltFlowMgr) updateFlowActionInfo(action *ofp.OfpAction, actionInfo map[string]interface{}, classifierInfo map[string]interface{}) {
-	if action.Type == utils.OUTPUT {
-		if out := action.GetOutput(); out != nil {
-			actionInfo[OUTPUT] = out.GetPort()
-			log.Debugw("action-type-output", log.Fields{"out_port": actionInfo[OUTPUT].(uint32)})
-		} else {
-			log.Error("Invalid output port in action")
-			return
-		}
-	} else if action.Type == utils.POP_VLAN {
-		actionInfo[PopVlan] = true
-		log.Debugw("action-type-pop-vlan", log.Fields{"in_port": classifierInfo[InPort].(uint32)})
-	} else if action.Type == utils.PUSH_VLAN {
-		if out := action.GetPush(); out != nil {
-			if tpid := out.GetEthertype(); tpid != 0x8100 {
-				log.Errorw("Invalid ethertype in push action", log.Fields{"ethertype": actionInfo[PushVlan].(int32)})
-			} else {
-				actionInfo[PushVlan] = true
-				actionInfo[TPID] = tpid
-				log.Debugw("action-type-push-vlan",
-					log.Fields{"push_tpid": actionInfo[TPID].(uint32), "in_port": classifierInfo[InPort].(uint32)})
-			}
-		}
-	} else if action.Type == utils.SET_FIELD {
-		if out := action.GetSetField(); out != nil {
-			if field := out.GetField(); field != nil {
-				if ofClass := field.GetOxmClass(); ofClass != ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
-					log.Errorw("Invalid openflow class", log.Fields{"class": ofClass})
-					return
-				}
-				/*log.Debugw("action-type-set-field",log.Fields{"field": field, "in_port": classifierInfo[IN_PORT].(uint32)})*/
-				if ofbField := field.GetOfbField(); ofbField != nil {
-					if fieldtype := ofbField.GetType(); fieldtype == ofp.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID {
-						if vlan := ofbField.GetVlanVid(); vlan != 0 {
-							actionInfo[VlanVid] = vlan & 0xfff
-							log.Debugw("action-set-vlan-vid", log.Fields{"actionInfo[VLAN_VID]": actionInfo[VlanVid].(uint32)})
-						} else {
-							log.Error("No Invalid vlan id in set vlan-vid action")
-						}
-					} else {
-						log.Errorw("unsupported-action-set-field-type", log.Fields{"type": fieldtype})
-					}
-				}
-			}
-		}
-	} else {
-		log.Errorw("Un supported action type", log.Fields{"type": action.Type})
+	TpID := utils.GetTechProfileIDFromWriteMetaData(metadata)
+	kvstoreTpId := f.resourceMgr.GetTechProfileIdForOnu(intfId, onuId, uniId)
+	if kvstoreTpId == 0 {
+		log.Debugf("tpid-not-present-in-kvstore, using tp id %d from flow metadata", TpID)
+	} else if kvstoreTpId != uint32(TpID) {
+		log.Error(" Tech-profile-updates-not-supported", log.Fields{"Tpid-in-flow": TpID, "kvstore-TpId": kvstoreTpId})
 		return
 	}
+	log.Debugw("TPID for this subcriber", log.Fields{"TpId": TpID, "pon": intfId, "onuId": onuId, "uniId": uniId})
+	if IsUpstream(actionInfo[OUTPUT].(uint32)) {
+		UsMeterID = utils.GetMeterIdFromFlow(flow)
+		log.Debugw("Upstream-flow-meter-id", log.Fields{"UsMeterID": UsMeterID})
+	} else {
+		DsMeterID = utils.GetMeterIdFromFlow(flow)
+		log.Debugw("Downstream-flow-meter-id", log.Fields{"DsMeterID": DsMeterID})
+
+	}
+	f.divideAndAddFlow(intfId, onuId, uniId, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
 }
 
 //sendTPDownloadMsgToChild send payload
-func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(intfID uint32, onuID uint32, uniID uint32, uni string) error {
+func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(intfID uint32, onuID uint32, uniID uint32, uni string, TpID uint32) error {
 
 	onuDevice, err := f.getOnuChildDevice(intfID, onuID)
 	if err != nil {
@@ -1177,7 +1443,7 @@
 	}
 	log.Debugw("Got child device from OLT device handler", log.Fields{"device": *onuDevice})
 
-	tpPath := f.getTPpath(intfID, uni)
+	tpPath := f.getTPpath(intfID, uni, TpID)
 	tpDownloadMsg := &ic.InterAdapterTechProfileDownloadMessage{UniId: uniID, Path: tpPath}
 	log.Infow("Sending Load-tech-profile-request-to-brcm-onu-adapter", log.Fields{"msg": *tpDownloadMsg})
 	sendErr := f.deviceHandler.AdapterProxy.SendInterAdapterMessage(context.Background(),
@@ -1280,6 +1546,31 @@
 	return gemPortID, err
 }
 
+func installFlowOnAllGemports(
+	f1 func(intfId uint32, onuId uint32, uniId uint32,
+		portNo uint32, classifier map[string]interface{}, action map[string]interface{},
+		logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32),
+	f2 func(intfId uint32, onuId uint32, uniId uint32, portNo uint32,
+		logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32, vlanId uint32),
+	args map[string]uint32,
+	classifier map[string]interface{}, action map[string]interface{},
+	logicalFlow *ofp.OfpFlowStats,
+	gemPorts []uint32,
+	FlowType string,
+	vlanId ...uint32) {
+	log.Debugw("Installing flow on all GEM ports", log.Fields{"FlowType": FlowType, "gemPorts": gemPorts, "vlan": vlanId})
+	for _, gemPortId := range gemPorts {
+		if FlowType == HsiaFlow || FlowType == DhcpFlow {
+			f1(args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortId)
+		} else if FlowType == EapolFlow {
+			f2(args["intfId"], args["onuId"], args["uniId"], args["portNo"], logicalFlow, args["allocId"], gemPortId, vlanId[0])
+		} else {
+			log.Errorw("Unrecognized Flow Type", log.Fields{"FlowType": FlowType})
+			return
+		}
+	}
+}
+
 func (f *OpenOltFlowMgr) addDHCPTrapFlowOnNNI(logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) {
 	log.Debug("Adding trap-dhcp-of-nni-flow")
 	action := make(map[string]interface{})
@@ -1306,7 +1597,7 @@
 		log.Debug("Flow-exists--not-re-adding")
 		return
 	}
-	flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), uint32(onuID), uint32(uniID), flowStoreCookie, "")
+	flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), uint32(onuID), uint32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
 	if err != nil {
 		log.Errorw("Flow id unavailable for DHCP traponNNI flow", log.Fields{"error": err})
 		return