Added SCA fix for voltha-openolt-adapter
Rebased openolt-adapter
Added log for error, took change for TP download for all the flows

Change-Id: Iaecfc346633bb8c17cd54085a85f8c74f3951d50
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index 5bf9417..287f927 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -45,9 +45,10 @@
+// Constants for number of retries and for timeout
 const (
-	MAX_RETRY         = 10
+	MaxRetry       = 10
+	MaxTimeOutInMs = 500
 //DeviceHandler will interact with the OLT device.
diff --git a/adaptercore/olt_platform.go b/adaptercore/olt_platform.go
index 72eb124..5540392 100644
--- a/adaptercore/olt_platform.go
+++ b/adaptercore/olt_platform.go
@@ -206,7 +206,7 @@
 	if uniPortNo == 0 {
-		return 0, 0, 0, 0, errors.New("Failed to extract Pon Interface, ONU Id and Uni Id from flow")
+		return 0, 0, 0, 0, errors.New("failed to extract Pon Interface, ONU Id and Uni Id from flow")
 	ponIntf = IntfIDFromUniPortNum(uniPortNo)
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index fae77a9..d47e716 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -212,14 +212,16 @@
 	log.Debugw("updated Stored flow info", log.Fields{"storedDeviceFlows": f.storedDeviceFlows})
-func (f *OpenOltFlowMgr) divideAndAddFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifierInfo map[string]interface{}, actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, TpId uint32, UsMeterId uint32, DsMeterId uint32, flowMetadata *voltha.FlowMetadata) {
-	var allocId []uint32
+func (f *OpenOltFlowMgr) divideAndAddFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32,
+	classifierInfo map[string]interface{}, actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, TpID uint32,
+	UsMeterID uint32, DsMeterID uint32, flowMetadata *voltha.FlowMetadata) {
+	var allocID []uint32
 	var gemPorts []uint32
 	var gemPort uint32
 	var TpInst *tp.TechProfile
 	log.Infow("Dividing flow", log.Fields{"intfId": intfID, "onuId": onuID, "uniId": uniID, "portNo": portNo,
-		"classifier": classifierInfo, "action": actionInfo, "UsMeterId": UsMeterId, "DsMeterId": DsMeterId, "TpId": TpId})
+		"classifier": classifierInfo, "action": actionInfo, "UsMeterID": UsMeterID, "DsMeterID": DsMeterID, "TpID": TpID})
 	// only create tcont/gemports if there is actually an onu id.  otherwise BAL throws an error.  Usually this
 	// is because the flow is an NNI flow and there would be no onu resources associated with it
 	// TODO: properly deal with NNI flows
@@ -230,8 +232,8 @@
 	uni := getUniPortPath(intfID, onuID, uniID)
 	log.Debugw("Uni port name", log.Fields{"uni": uni})
-	allocId, gemPorts, TpInst = f.createTcontGemports(intfID, onuID, uniID, uni, portNo, TpId, UsMeterId, DsMeterId, flowMetadata)
-	if allocId == nil || gemPorts == nil || TpInst == nil {
+	allocID, gemPorts, TpInst = f.createTcontGemports(intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
+	if allocID == nil || gemPorts == nil || TpInst == nil {
@@ -245,130 +247,51 @@
 	args["onuId"] = onuID
 	args["uniId"] = uniID
 	args["portNo"] = portNo
-	args["allocId"] = allocId[0]
+	args["allocId"] = allocID[0]
-	if ipProto, ok := classifierInfo[IPProto]; ok {
-		if ipProto.(uint32) == IPProtoDhcp {
-			log.Info("Adding DHCP flow")
-			if pcp, ok := classifierInfo[VlanPcp]; ok {
-				gemPort = f.techprofile[intfID].GetGemportIDForPbit(TpInst,
-					tp_pb.Direction_UPSTREAM,
-					pcp.(uint32))
-				//Adding DHCP upstream flow
-				f.addDHCPTrapFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocId[0], gemPort)
-			} else {
-				//Adding DHCP upstream flow to all gemports
-				installFlowOnAllGemports(f.addDHCPTrapFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, DhcpFlow)
-			}
-		} else if ipProto == IgmpProto {
-			log.Info("igmp flow add ignored, not implemented yet")
-			return
-		} else {
-			log.Errorw("Invalid-Classifier-to-handle", log.Fields{"classifier": classifierInfo, "action": actionInfo})
-			return
-		}
-	} else if ethType, ok := classifierInfo[EthType]; ok {
-		if ethType.(uint32) == EapEthType {
-			log.Info("Adding EAPOL flow")
-			var vlanId uint32
-			if val, ok := classifierInfo[VlanVid]; ok {
-				vlanId = (val.(uint32)) & VlanvIDMask
-			} else {
-				vlanId = DefaultMgmtVlan
-			}
-			if pcp, ok := classifierInfo[VlanPcp]; ok {
-				gemPort = f.techprofile[intfID].GetGemportIDForPbit(TpInst,
-					tp_pb.Direction_UPSTREAM,
-					pcp.(uint32))
-				f.addEAPOLFlow(intfID, onuID, uniID, portNo, flow, allocId[0], gemPort, vlanId)
-			} else {
-				installFlowOnAllGemports(nil, f.addEAPOLFlow, args, classifierInfo, actionInfo, flow, gemPorts, EapolFlow, vlanId)
-			}
-		}
-		if ethType == LldpEthType {
-			log.Info("Adding LLDP flow")
-			addLLDPFlow(flow, portNo)
-			return
-		}
-	} else if _, ok := actionInfo[PushVlan]; ok {
-		log.Info("Adding upstream data rule")
-		if pcp, ok := classifierInfo[VlanPcp]; ok {
-			gemPort = f.techprofile[intfID].GetGemportIDForPbit(TpInst,
-				tp_pb.Direction_UPSTREAM,
-				pcp.(uint32))
-			//Adding HSIA upstream flow
-			f.addUpstreamDataFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocId[0], gemPort)
-		} else {
-			//Adding HSIA upstream flow to all gemports
-			installFlowOnAllGemports(f.addUpstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, HsiaFlow)
-		}
-	} else if _, ok := actionInfo[PopVlan]; ok {
-		log.Info("Adding Downstream data rule")
-		if pcp, ok := classifierInfo[VlanPcp]; ok {
-			gemPort = f.techprofile[intfID].GetGemportIDForPbit(TpInst,
-				tp_pb.Direction_UPSTREAM,
-				pcp.(uint32))
-			//Adding HSIA downstream flow
-			f.addDownstreamDataFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocId[0], gemPort)
-		} else {
-			//Adding HSIA downstream flow to all gemports
-			installFlowOnAllGemports(f.addDownstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, HsiaFlow)
-		}
-	} else {
-		log.Errorw("Invalid-flow-type-to-handle", log.Fields{"classifier": classifierInfo, "action": actionInfo, "flow": flow})
-		return
-	}
-	// Send Techprofile download event to child device in go routine as it takes time
-	go f.sendTPDownloadMsgToChild(intfID, onuID, uniID, uni, TpId)
+	f.checkAndAddFlow(args, classifierInfo, actionInfo, flow, gemPort, intfID, onuID, uniID, portNo, TpInst, allocID, gemPorts, TpID, uni)
-func (f *OpenOltFlowMgr) CreateSchedulerQueues(Dir tp_pb.Direction, IntfId uint32, OnuId uint32, UniId uint32, UniPort uint32, TpInst *tp.TechProfile, MeterId uint32, flowMetadata *voltha.FlowMetadata) error {
+// CreateSchedulerQueues creates traffic schedulers on the device with the given scheduler configuration and traffic shaping info
+func (f *OpenOltFlowMgr) CreateSchedulerQueues(Dir tp_pb.Direction, IntfID uint32, OnuID uint32, UniID uint32, UniPort uint32, TpInst *tp.TechProfile, MeterID uint32, flowMetadata *voltha.FlowMetadata) error {
-	log.Debugw("CreateSchedulerQueues", log.Fields{"Dir": Dir, "IntfId": IntfId, "OnuId": OnuId,
-		"UniId": UniId, "MeterId": MeterId, "TpInst": *TpInst, "flowMetadata": flowMetadata})
+	log.Debugw("CreateSchedulerQueues", log.Fields{"Dir": Dir, "IntfID": IntfID, "OnuID": OnuID,
+		"UniID": UniID, "MeterID": MeterID, "TpInst": *TpInst, "flowMetadata": flowMetadata})
-	if MeterId == 0 { // This should never happen
-		log.Error("Invalid meter id")
-		return errors.New("Invalid meter id")
+	Direction, err := verifyMeterIDAndGetDirection(MeterID, Dir)
+	if err != nil {
+		return err
 	/* Lets make a simple assumption that if the meter-id is present on the KV store,
 	 * then the scheduler and queues configuration is applied on the OLT device
 	 * in the given direction.
-	var Direction string
 	var SchedCfg *tp_pb.SchedulerConfig
-	if Dir == tp_pb.Direction_UPSTREAM {
-		Direction = "upstream"
-	} else if Dir == tp_pb.Direction_DOWNSTREAM {
-		Direction = "downstream"
-	}
-	KvStoreMeter, err := f.resourceMgr.GetMeterIdForOnu(Direction, IntfId, OnuId, UniId)
+	KvStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(Direction, IntfID, OnuID, UniID)
 	if err != nil {
-		log.Error("Failed to get meter for intf %d, onuid %d, uniid %d", IntfId, OnuId, UniId)
+		log.Error("Failed to get meter for intf %d, onuid %d, uniid %d", IntfID, OnuID, UniID)
 		return err
 	if KvStoreMeter != nil {
-		if KvStoreMeter.MeterId == MeterId {
+		if KvStoreMeter.MeterId == MeterID {
 			log.Debug("Scheduler already created for upstream")
 			return nil
-		} else {
-			log.Errorw("Dynamic meter update not supported", log.Fields{"KvStoreMeterId": KvStoreMeter.MeterId, "MeterId-in-flow": MeterId})
-			return errors.New("Invalid-meter-id-in-flow")
+		log.Errorw("Dynamic meter update not supported", log.Fields{"KvStoreMeterId": KvStoreMeter.MeterId, "MeterID-in-flow": MeterID})
+		return errors.New("invalid-meter-id-in-flow")
-	log.Debugw("Meter-does-not-exist-Creating-new", log.Fields{"MeterId": MeterId, "Direction": Direction})
+	log.Debugw("Meter-does-not-exist-Creating-new", log.Fields{"MeterID": MeterID, "Direction": Direction})
 	if Dir == tp_pb.Direction_UPSTREAM {
-		SchedCfg = f.techprofile[IntfId].GetUsScheduler(TpInst)
+		SchedCfg = f.techprofile[IntfID].GetUsScheduler(TpInst)
 	} else if Dir == tp_pb.Direction_DOWNSTREAM {
-		SchedCfg = f.techprofile[IntfId].GetDsScheduler(TpInst)
+		SchedCfg = f.techprofile[IntfID].GetDsScheduler(TpInst)
 	var meterConfig *ofp.OfpMeterConfig
 	if flowMetadata != nil {
 		for _, meter := range flowMetadata.Meters {
-			if MeterId == meter.MeterId {
+			if MeterID == meter.MeterId {
 				meterConfig = meter
 				log.Debugw("Found-meter-config-from-flowmetadata", log.Fields{"meterConfig": meterConfig})
@@ -378,11 +301,11 @@
 	if meterConfig == nil {
-		log.Errorw("Could-not-get-meterbands-from-flowMetadata", log.Fields{"flowMetadata": flowMetadata, "MeterId": MeterId})
-		return errors.New("Failed-to-get-meter-from-flowMetadata")
+		log.Errorw("Could-not-get-meterbands-from-flowMetadata", log.Fields{"flowMetadata": flowMetadata, "MeterID": MeterID})
+		return errors.New("failed-to-get-meter-from-flowMetadata")
 	} else if len(meterConfig.Bands) < MaxMeterBand {
-		log.Errorw("Invalid-number-of-bands-in-meter", log.Fields{"Bands": meterConfig.Bands, "MeterId": MeterId})
-		return errors.New("Invalid-number-of-bands-in-meter")
+		log.Errorw("Invalid-number-of-bands-in-meter", log.Fields{"Bands": meterConfig.Bands, "MeterID": MeterID})
+		return errors.New("invalid-number-of-bands-in-meter")
 	cir := meterConfig.Bands[0].Rate
 	cbs := meterConfig.Bands[0].BurstSize
@@ -392,33 +315,33 @@
 	pbs := cbs + ebs
 	TrafficShaping := &tp_pb.TrafficShapingInfo{Cir: cir, Cbs: cbs, Pir: pir, Pbs: pbs}
-	TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[IntfId].GetTrafficScheduler(TpInst, SchedCfg, TrafficShaping)}
+	TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[IntfID].GetTrafficScheduler(TpInst, SchedCfg, TrafficShaping)}
 	log.Debugw("Sending Traffic scheduler create to device", log.Fields{"Direction": Direction, "TrafficScheds": TrafficSched})
 	if _, err := f.deviceHandler.Client.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
-		IntfId: IntfId, OnuId: OnuId,
-		UniId: UniId, PortNo: UniPort,
+		IntfId: IntfID, OnuId: OnuID,
+		UniId: UniID, PortNo: UniPort,
 		TrafficScheds: TrafficSched}); err != nil {
 		log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
 		return err
 	// On receiving the CreateTrafficQueues request, the driver should create corresponding
 	// downstream queues.
-	trafficQueues := f.techprofile[IntfId].GetTrafficQueues(TpInst, Dir)
+	trafficQueues := f.techprofile[IntfID].GetTrafficQueues(TpInst, Dir)
 	log.Debugw("Sending Traffic Queues create to device", log.Fields{"Direction": Direction, "TrafficQueues": trafficQueues})
 	if _, err := f.deviceHandler.Client.CreateTrafficQueues(context.Background(),
-		&tp_pb.TrafficQueues{IntfId: IntfId, OnuId: OnuId,
-			UniId: UniId, PortNo: UniPort,
+		&tp_pb.TrafficQueues{IntfId: IntfID, OnuId: OnuID,
+			UniId: UniID, PortNo: UniPort,
 			TrafficQueues: trafficQueues}); err != nil {
 		log.Errorw("Failed to create traffic queues in device", log.Fields{"error": err})
 		return err
-	/* After we succesfully applied the scheduler configuration on the OLT device,
+	/* After we successfully applied the scheduler configuration on the OLT device,
 	 * store the meter id on the KV store, for further reference.
-	if err := f.resourceMgr.UpdateMeterIdForOnu(Direction, IntfId, OnuId, UniId, meterConfig); err != nil {
-		log.Error("Failed to update meter id for onu %d, meterid %d", OnuId, MeterId)
+	if err := f.resourceMgr.UpdateMeterIDForOnu(Direction, IntfID, OnuID, UniID, meterConfig); err != nil {
+		log.Error("Failed to update meter id for onu %d, meterid %d", OnuID, MeterID)
 		return err
 	log.Debugw("updated-meter-info into KV store successfully", log.Fields{"Direction": Direction,
@@ -426,27 +349,28 @@
 	return nil
-func (f *OpenOltFlowMgr) RemoveSchedulerQueues(Dir tp_pb.Direction, IntfId uint32, OnuId uint32, UniId uint32, UniPort uint32, TpInst *tp.TechProfile) error {
+// RemoveSchedulerQueues removes the traffic schedulers from the device based on the given scheduler configuration and traffic shaping info
+func (f *OpenOltFlowMgr) RemoveSchedulerQueues(Dir tp_pb.Direction, IntfID uint32, OnuID uint32, UniID uint32, UniPort uint32, TpInst *tp.TechProfile) error {
 	var Direction string
 	var SchedCfg *tp_pb.SchedulerConfig
 	var err error
-	log.Debugw("Removing schedulers and Queues in OLT", log.Fields{"Direction": Dir, "IntfId": IntfId, "OnuId": OnuId, "UniId": UniId, "UniPort": UniPort})
+	log.Debugw("Removing schedulers and Queues in OLT", log.Fields{"Direction": Dir, "IntfID": IntfID, "OnuID": OnuID, "UniID": UniID, "UniPort": UniPort})
 	if Dir == tp_pb.Direction_UPSTREAM {
-		SchedCfg = f.techprofile[IntfId].GetUsScheduler(TpInst)
+		SchedCfg = f.techprofile[IntfID].GetUsScheduler(TpInst)
 		Direction = "upstream"
 	} else if Dir == tp_pb.Direction_DOWNSTREAM {
-		SchedCfg = f.techprofile[IntfId].GetDsScheduler(TpInst)
+		SchedCfg = f.techprofile[IntfID].GetDsScheduler(TpInst)
 		Direction = "downstream"
-	KVStoreMeter, err := f.resourceMgr.GetMeterIdForOnu(Direction, IntfId, OnuId, UniId)
+	KVStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(Direction, IntfID, OnuID, UniID)
 	if err != nil {
-		log.Errorf("Failed to get Meter for Onu %d", OnuId)
+		log.Errorf("Failed to get Meter for Onu %d", OnuID)
 		return err
 	if KVStoreMeter == nil {
-		log.Debugw("No-meter-has-been-installed-yet", log.Fields{"direction": Direction, "IntfId": IntfId, "OnuId": OnuId, "UniId": UniId})
+		log.Debugw("No-meter-has-been-installed-yet", log.Fields{"direction": Direction, "IntfID": IntfID, "OnuID": OnuID, "UniID": UniID})
 		return nil
 	cir := KVStoreMeter.Bands[0].Rate
@@ -458,34 +382,34 @@
 	TrafficShaping := &tp_pb.TrafficShapingInfo{Cir: cir, Cbs: cbs, Pir: pir, Pbs: pbs}
-	TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[IntfId].GetTrafficScheduler(TpInst, SchedCfg, TrafficShaping)}
-	TrafficQueues := f.techprofile[IntfId].GetTrafficQueues(TpInst, Dir)
+	TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[IntfID].GetTrafficScheduler(TpInst, SchedCfg, TrafficShaping)}
+	TrafficQueues := f.techprofile[IntfID].GetTrafficQueues(TpInst, Dir)
 	if _, err = f.deviceHandler.Client.RemoveTrafficQueues(context.Background(),
-		&tp_pb.TrafficQueues{IntfId: IntfId, OnuId: OnuId,
-			UniId: UniId, PortNo: UniPort,
+		&tp_pb.TrafficQueues{IntfId: IntfID, OnuId: OnuID,
+			UniId: UniID, PortNo: UniPort,
 			TrafficQueues: TrafficQueues}); err != nil {
-		log.Error("Failed to remove traffic queues")
+		log.Errorw("Failed to remove traffic queues", log.Fields{"error": err})
 		return err
-	} else {
-		log.Debug("Removed traffic queues successfully")
+	log.Debug("Removed traffic queues successfully")
 	if _, err = f.deviceHandler.Client.RemoveTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
-		IntfId: IntfId, OnuId: OnuId,
-		UniId: UniId, PortNo: UniPort,
+		IntfId: IntfID, OnuId: OnuID,
+		UniId: UniID, PortNo: UniPort,
 		TrafficScheds: TrafficSched}); err != nil {
-		log.Error("failed to remove traffic schedulers")
+		log.Errorw("failed to remove traffic schedulers", log.Fields{"error": err})
 		return err
-	} else {
-		log.Debug("Removed traffic schedulers successfully")
-	/* After we succesfully remove the scheduler configuration on the OLT device,
+	log.Debug("Removed traffic schedulers successfully")
+	/* After we successfully remove the scheduler configuration on the OLT device,
 	 * delete the meter id on the KV store.
-	err = f.resourceMgr.RemoveMeterIdForOnu(Direction, IntfId, OnuId, UniId)
+	err = f.resourceMgr.RemoveMeterIDForOnu(Direction, IntfID, OnuID, UniID)
 	if err != nil {
-		log.Errorf("Failed to remove meter for onu %d, meter id %d", OnuId, KVStoreMeter.MeterId)
+		log.Errorf("Failed to remove meter for onu %d, meter id %d", OnuID, KVStoreMeter.MeterId)
+		return err
 	log.Debugw("Removed-meter-from-KV-store successfully", log.Fields{"MeterId": KVStoreMeter.MeterId, "dir": Direction})
 	return err
@@ -496,14 +420,14 @@
 	var allocID []uint32
 	var gemPortIDs []uint32
 	//If we already have allocated earlier for this onu, render them
-	if tcontId := f.resourceMgr.GetCurrentAllocIDForOnu(intfID, onuID, uniID); tcontId != 0 {
-		allocID = append(allocID, tcontId)
+	if tcontID := f.resourceMgr.GetCurrentAllocIDForOnu(intfID, onuID, uniID); tcontID != 0 {
+		allocID = append(allocID, tcontID)
 	gemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(intfID, onuID, uniID)
 	tpPath := f.getTPpath(intfID, uni, TpID)
 	// Check tech profile instance already exists for derived port name
-	tech_profile_instance, err := f.techprofile[intfID].GetTPInstanceFromKVStore(TpID, tpPath)
+	techProfileInstance, err := f.techprofile[intfID].GetTPInstanceFromKVStore(TpID, tpPath)
 	if err != nil { // This should not happen, something wrong in KV backend transaction
 		log.Errorw("Error in fetching tech profile instance from KV store", log.Fields{"tpID": TpID, "path": tpPath})
 		return nil, nil, nil
@@ -511,41 +435,41 @@
 	log.Debug("Creating New TConts and Gem ports", log.Fields{"pon": intfID, "onu": onuID, "uni": uniID})
-	if tech_profile_instance == nil {
+	if techProfileInstance == nil {
 		log.Info("Creating tech profile instance", log.Fields{"path": tpPath})
-		tech_profile_instance = f.techprofile[intfID].CreateTechProfInstance(TpID, uni, intfID)
-		if tech_profile_instance == nil {
+		techProfileInstance = f.techprofile[intfID].CreateTechProfInstance(TpID, uni, intfID)
+		if techProfileInstance == nil {
 			return nil, nil, nil
-		f.resourceMgr.UpdateTechProfileIdForOnu(intfID, onuID, uniID, TpID)
+		f.resourceMgr.UpdateTechProfileIDForOnu(intfID, onuID, uniID, TpID)
 	} else {
 		log.Debugw("Tech-profile-instance-already-exist-for-given port-name", log.Fields{"uni": uni})
 	if UsMeterID != 0 {
-		if err := f.CreateSchedulerQueues(tp_pb.Direction_UPSTREAM, intfID, onuID, uniID, uniPort, tech_profile_instance, UsMeterID, flowMetadata); err != nil {
+		if err := f.CreateSchedulerQueues(tp_pb.Direction_UPSTREAM, intfID, onuID, uniID, uniPort, techProfileInstance, UsMeterID, flowMetadata); err != nil {
 			log.Errorw("CreateSchedulerQueues Failed-upstream", log.Fields{"error": err, "meterID": UsMeterID})
 			return nil, nil, nil
 	if DsMeterID != 0 {
-		if err := f.CreateSchedulerQueues(tp_pb.Direction_DOWNSTREAM, intfID, onuID, uniID, uniPort, tech_profile_instance, DsMeterID, flowMetadata); err != nil {
+		if err := f.CreateSchedulerQueues(tp_pb.Direction_DOWNSTREAM, intfID, onuID, uniID, uniPort, techProfileInstance, DsMeterID, flowMetadata); err != nil {
 			log.Errorw("CreateSchedulerQueues Failed-downstream", log.Fields{"error": err, "meterID": DsMeterID})
 			return nil, nil, nil
 	if len(allocID) == 0 { // Created TCONT first time
-		allocID = append(allocID, tech_profile_instance.UsScheduler.AllocID)
+		allocID = append(allocID, techProfileInstance.UsScheduler.AllocID)
 	if len(gemPortIDs) == 0 { // Create GEM ports first time
-		for _, gem := range tech_profile_instance.UpstreamGemPortAttributeList {
+		for _, gem := range techProfileInstance.UpstreamGemPortAttributeList {
 			gemPortIDs = append(gemPortIDs, gem.GemportID)
 	log.Debugw("Allocated Tcont and GEM ports", log.Fields{"allocID": allocID, "gemports": gemPortIDs})
 	// Send Tconts and GEM ports to KV store
 	f.storeTcontsGEMPortsIntoKVStore(intfID, onuID, uniID, allocID, gemPortIDs)
-	return allocID, gemPortIDs, tech_profile_instance
+	return allocID, gemPortIDs, techProfileInstance
 func (f *OpenOltFlowMgr) storeTcontsGEMPortsIntoKVStore(intfID uint32, onuID uint32, uniID uint32, allocID []uint32, gemPortIDs []uint32) {
@@ -569,19 +493,19 @@
 func (f *OpenOltFlowMgr) populateTechProfilePerPonPort() error {
-	var tpCount int = 0
+	var tpCount int
 	for _, techRange := range f.resourceMgr.DevInfo.Ranges {
-		for _, intfId := range techRange.IntfIds {
-			f.techprofile[intfId] = f.resourceMgr.ResourceMgrs[uint32(intfId)].TechProfileMgr
+		for _, intfID := range techRange.IntfIds {
+			f.techprofile[intfID] = f.resourceMgr.ResourceMgrs[uint32(intfID)].TechProfileMgr
-			log.Debugw("Init tech profile done", log.Fields{"intfId": intfId})
+			log.Debugw("Init tech profile done", log.Fields{"intfID": intfID})
 	//Make sure we have as many tech_profiles as there are pon ports on the device
 	if tpCount != int(f.resourceMgr.DevInfo.GetPonPorts()) {
 		log.Errorw("Error while populating techprofile",
 			log.Fields{"numofTech": tpCount, "numPonPorts": f.resourceMgr.DevInfo.GetPonPorts()})
-		return errors.New("Error while populating techprofile mgrs")
+		return errors.New("error while populating techprofile mgrs")
 	log.Infow("Populated techprofile for ponports successfully",
 		log.Fields{"numofTech": tpCount, "numPonPorts": f.resourceMgr.DevInfo.GetPonPorts()})
@@ -638,13 +562,13 @@
 	log.Debugw("Adding HSIA flow", log.Fields{"intfId": intfID, "onuId": onuID, "uniId": uniID, "classifier": classifier,
 		"action": action, "direction": direction, "allocId": allocID, "gemPortId": gemPortID,
 		"logicalFlow": *logicalFlow})
-	var vlan_pit uint32 = 0
+	var vlanPit uint32
 	if _, ok := classifier[VlanPcp]; ok {
-		vlan_pit = classifier[VlanPcp].(uint32)
-		log.Debugw("Found pbit in the flow", log.Fields{"vlan_pit": vlan_pit})
+		vlanPit = classifier[VlanPcp].(uint32)
+		log.Debugw("Found pbit in the flow", log.Fields{"vlan_pit": vlanPit})
 	flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
-	flowId, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, HsiaFlow, vlan_pit)
+	flowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, HsiaFlow, vlanPit)
 	if err != nil {
 		log.Errorw("Flow id unavailable for HSIA flow", log.Fields{"direction": direction})
@@ -665,7 +589,7 @@
 	flow := openoltpb2.Flow{AccessIntfId: int32(intfID),
 		OnuId:         int32(onuID),
 		UniId:         int32(uniID),
-		FlowId:        flowId,
+		FlowId:        flowID,
 		FlowType:      direction,
 		AllocId:       int32(allocID),
 		NetworkIntfId: int32(networkIntfID),
@@ -677,7 +601,7 @@
 		PortNo:        portNo}
 	if ok := f.addFlowToDevice(logicalFlow, &flow); ok {
 		log.Debug("HSIA flow added to device successfully", log.Fields{"direction": direction})
-		flowsToKVStore := f.getUpdatedFlowInfo(&flow, flowStoreCookie, HsiaFlow, flowId)
+		flowsToKVStore := f.getUpdatedFlowInfo(&flow, flowStoreCookie, HsiaFlow, flowID)
 		if err := f.updateFlowInfoToKVStore(flow.AccessIntfId,
@@ -848,14 +772,14 @@
 		downlinkAction[PushVlan] = true
 		downlinkAction[VlanVid] = vlanID
 		flowStoreCookie := getFlowStoreCookie(downlinkClassifier, gemPortID)
-		downlinkFlowId, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, "", 0)
+		downlinkFlowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, "", 0)
 		if err != nil {
 			log.Errorw("flowId unavailable for DL EAPOL",
 				log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
 		log.Debugw("Creating DL EAPOL flow",
-			log.Fields{"dl_classifier": downlinkClassifier, "dl_action": downlinkAction, "downlinkFlowId": downlinkFlowId})
+			log.Fields{"dl_classifier": downlinkClassifier, "dl_action": downlinkAction, "downlinkFlowID": downlinkFlowID})
 		if classifierProto = makeOpenOltClassifierField(downlinkClassifier); classifierProto == nil {
 			log.Error("Error in making classifier protobuf for downlink flow")
@@ -868,7 +792,7 @@
 		downstreamFlow = openoltpb2.Flow{AccessIntfId: int32(intfID),
 			OnuId:         int32(onuID),
 			UniId:         int32(uniID),
-			FlowId:        downlinkFlowId,
+			FlowId:        downlinkFlowID,
 			FlowType:      DOWNSTREAM,
 			AllocId:       int32(allocID),
 			NetworkIntfId: int32(networkIntfID),
@@ -881,7 +805,7 @@
 		if ok := f.addFlowToDevice(logicalFlow, &downstreamFlow); ok {
 			log.Debug("EAPOL DL flow added to device successfully")
 			flowCategory := ""
-			flowsToKVStore := f.getUpdatedFlowInfo(&downstreamFlow, flowStoreCookie, flowCategory, downlinkFlowId)
+			flowsToKVStore := f.getUpdatedFlowInfo(&downstreamFlow, flowStoreCookie, flowCategory, downlinkFlowID)
 			if err := f.updateFlowInfoToKVStore(downstreamFlow.AccessIntfId,
@@ -1193,15 +1117,15 @@
 	flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ponIntf, onuID, uniID)
 	if len(flowIds) == 0 {
 		log.Debugf("Flow count for subscriber %d is zero", onuID)
-		kvstoreTpId := f.resourceMgr.GetTechProfileIdForOnu(ponIntf, onuID, uniID)
-		if kvstoreTpId == 0 {
+		kvstoreTpID := f.resourceMgr.GetTechProfileIDForOnu(ponIntf, onuID, uniID)
+		if kvstoreTpID == 0 {
 			log.Warnw("Could-not-find-techprofile-tableid-for-uni", log.Fields{"ponIntf": ponIntf, "onuId": onuID, "uniId": uniID})
 		uni := getUniPortPath(ponIntf, onuID, uniID)
-		tpPath := f.getTPpath(ponIntf, uni, kvstoreTpId)
+		tpPath := f.getTPpath(ponIntf, uni, kvstoreTpID)
 		log.Debugw("Getting-techprofile-instance-for-subscriber", log.Fields{"TP-PATH": tpPath})
-		techprofileInst, err := f.techprofile[ponIntf].GetTPInstanceFromKVStore(kvstoreTpId, tpPath)
+		techprofileInst, err := f.techprofile[ponIntf].GetTPInstanceFromKVStore(kvstoreTpID, tpPath)
 		if err != nil { // This should not happen, something wrong in KV backend transaction
 			log.Errorw("Error in fetching tech profile instance from KV store", log.Fields{"tpID": 20, "path": tpPath})
@@ -1263,137 +1187,24 @@
 	var DsMeterID uint32
 	log.Debug("Adding Flow", log.Fields{"flow": flow, "flowMetadata": flowMetadata})
-	for _, field := range utils.GetOfbFields(flow) {
-		if field.Type == utils.ETH_TYPE {
-			classifierInfo[EthType] = field.GetEthType()
-			log.Debug("field-type-eth-type", log.Fields{"classifierInfo[ETH_TYPE]": classifierInfo[EthType].(uint32)})
-		} else if field.Type == utils.IP_PROTO {
-			classifierInfo[IPProto] = field.GetIpProto()
-			log.Debug("field-type-ip-proto", log.Fields{"classifierInfo[IP_PROTO]": classifierInfo[IPProto].(uint32)})
-		} else if field.Type == utils.IN_PORT {
-			classifierInfo[InPort] = field.GetPort()
-			log.Debug("field-type-in-port", log.Fields{"classifierInfo[IN_PORT]": classifierInfo[InPort].(uint32)})
-		} else if field.Type == utils.VLAN_VID {
-			classifierInfo[VlanVid] = field.GetVlanVid()
-			log.Debug("field-type-vlan-vid", log.Fields{"classifierInfo[VLAN_VID]": classifierInfo[VlanVid].(uint32)})
-		} else if field.Type == utils.VLAN_PCP {
-			classifierInfo[VlanPcp] = field.GetVlanPcp()
-			log.Debug("field-type-vlan-pcp", log.Fields{"classifierInfo[VLAN_PCP]": classifierInfo[VlanPcp].(uint32)})
-		} else if field.Type == utils.UDP_DST {
-			classifierInfo[UDPDst] = field.GetUdpDst()
-			log.Debug("field-type-udp-dst", log.Fields{"classifierInfo[UDP_DST]": classifierInfo[UDPDst].(uint32)})
-		} else if field.Type == utils.UDP_SRC {
-			classifierInfo[UDPSrc] = field.GetUdpSrc()
-			log.Debug("field-type-udp-src", log.Fields{"classifierInfo[UDP_SRC]": classifierInfo[UDPSrc].(uint32)})
-		} else if field.Type == utils.IPV4_DST {
-			classifierInfo[Ipv4Dst] = field.GetIpv4Dst()
-			log.Debug("field-type-ipv4-dst", log.Fields{"classifierInfo[IPV4_DST]": classifierInfo[Ipv4Dst].(uint32)})
-		} else if field.Type == utils.IPV4_SRC {
-			classifierInfo[Ipv4Src] = field.GetIpv4Src()
-			log.Debug("field-type-ipv4-src", log.Fields{"classifierInfo[IPV4_SRC]": classifierInfo[Ipv4Src].(uint32)})
-		} else if field.Type == utils.METADATA {
-			classifierInfo[METADATA] = field.GetTableMetadata()
-			log.Debug("field-type-metadata", log.Fields{"classifierInfo[METADATA]": classifierInfo[METADATA].(uint64)})
-		} else if field.Type == utils.TUNNEL_ID {
-			classifierInfo[TunnelID] = field.GetTunnelId()
-			log.Debug("field-type-tunnelId", log.Fields{"classifierInfo[TUNNEL_ID]": classifierInfo[TunnelID].(uint64)})
-		} else {
-			log.Errorw("Un supported field type", log.Fields{"type": field.Type})
-			return
-		}
+	formulateClassifierInfoFromFlow(classifierInfo, flow)
+	err := formulateActionInfoFromFlow(actionInfo, classifierInfo, flow)
+	if err != nil {
+		// Error logging is already done in the called function
+		// So just return in case of error
+		return
-	for _, action := range utils.GetActions(flow) {
-		if action.Type == utils.OUTPUT {
-			if out := action.GetOutput(); out != nil {
-				actionInfo[OUTPUT] = out.GetPort()
-				log.Debugw("action-type-output", log.Fields{"out_port": actionInfo[OUTPUT].(uint32)})
-			} else {
-				log.Error("Invalid output port in action")
-				return
-			}
-		} else if action.Type == utils.POP_VLAN {
-			actionInfo[PopVlan] = true
-			log.Debugw("action-type-pop-vlan", log.Fields{"in_port": classifierInfo[InPort].(uint32)})
-		} else if action.Type == utils.PUSH_VLAN {
-			if out := action.GetPush(); out != nil {
-				if tpid := out.GetEthertype(); tpid != 0x8100 {
-					log.Errorw("Invalid ethertype in push action", log.Fields{"ethertype": actionInfo[PushVlan].(int32)})
-				} else {
-					actionInfo[PushVlan] = true
-					actionInfo[TPID] = tpid
-					log.Debugw("action-type-push-vlan",
-						log.Fields{"push_tpid": actionInfo[TPID].(uint32), "in_port": classifierInfo[InPort].(uint32)})
-				}
-			}
-		} else if action.Type == utils.SET_FIELD {
-			if out := action.GetSetField(); out != nil {
-				if field := out.GetField(); field != nil {
-					if ofClass := field.GetOxmClass(); ofClass != ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
-						log.Errorw("Invalid openflow class", log.Fields{"class": ofClass})
-						return
-					}
-					/*log.Debugw("action-type-set-field",log.Fields{"field": field, "in_port": classifierInfo[IN_PORT].(uint32)})*/
-					if ofbField := field.GetOfbField(); ofbField != nil {
-						if fieldtype := ofbField.GetType(); fieldtype == ofp.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID {
-							if vlan := ofbField.GetVlanVid(); vlan != 0 {
-								actionInfo[VlanVid] = vlan & 0xfff
-								log.Debugw("action-set-vlan-vid", log.Fields{"actionInfo[VLAN_VID]": actionInfo[VlanVid].(uint32)})
-							} else {
-								log.Error("No Invalid vlan id in set vlan-vid action")
-							}
-						} else {
-							log.Errorw("unsupported-action-set-field-type", log.Fields{"type": fieldtype})
-						}
-					}
-				}
-			}
-		} else {
-			log.Errorw("Un supported action type", log.Fields{"type": action.Type})
-			return
-		}
-	}
 	/* Controller bound trap flows */
-	if isControllerFlow := IsControllerBoundFlow(actionInfo[OUTPUT].(uint32)); isControllerFlow {
-		log.Debug("Controller bound trap flows, getting inport from tunnelid")
-		/* Get UNI port/ IN Port from tunnel ID field for upstream controller bound flows  */
-		if portType := IntfIDToPortTypeName(classifierInfo[InPort].(uint32)); portType == voltha.Port_PON_OLT {
-			if uniPort := utils.GetChildPortFromTunnelId(flow); uniPort != 0 {
-				classifierInfo[InPort] = uniPort
-				log.Debugw("upstream pon-to-controller-flow,inport-in-tunnelid", log.Fields{"newInPort": classifierInfo[InPort].(uint32), "outPort": actionInfo[OUTPUT].(uint32)})
-			} else {
-				log.Error("upstream pon-to-controller-flow, NO-inport-in-tunnelid")
-				return
-			}
-		} /*else {
-			log.Debugw("Trap on NNI flow currently not supported", log.Fields{"flow": *flow})
-			return
-		}*/
-	} else {
-		log.Debug("Non-Controller flows, getting uniport from tunnelid")
-		// Downstream flow from NNI to PON port , Use tunnel ID as new OUT port / UNI port
-		if portType := IntfIDToPortTypeName(actionInfo[OUTPUT].(uint32)); portType == voltha.Port_PON_OLT {
-			if uniPort := utils.GetChildPortFromTunnelId(flow); uniPort != 0 {
-				actionInfo[OUTPUT] = uniPort
-				log.Debugw("downstream-nni-to-pon-port-flow, outport-in-tunnelid", log.Fields{"newOutPort": actionInfo[OUTPUT].(uint32), "outPort": actionInfo[OUTPUT].(uint32)})
-			} else {
-				log.Debug("downstream-nni-to-pon-port-flow, no-outport-in-tunnelid", log.Fields{"InPort": classifierInfo[InPort].(uint32), "outPort": actionInfo[OUTPUT].(uint32)})
-				return
-			}
-			// Upstream flow from PON to NNI port , Use tunnel ID as new IN port / UNI port
-		} else if portType := IntfIDToPortTypeName(classifierInfo[InPort].(uint32)); portType == voltha.Port_PON_OLT {
-			if uniPort := utils.GetChildPortFromTunnelId(flow); uniPort != 0 {
-				classifierInfo[InPort] = uniPort
-				log.Debugw("upstream-pon-to-nni-port-flow, inport-in-tunnelid", log.Fields{"newInPort": actionInfo[OUTPUT].(uint32),
-					"outport": actionInfo[OUTPUT].(uint32)})
-			} else {
-				log.Debug("upstream-pon-to-nni-port-flow, no-inport-in-tunnelid", log.Fields{"InPort": classifierInfo[InPort].(uint32),
-					"outPort": actionInfo[OUTPUT].(uint32)})
-				return
-			}
-		}
+	err = formulateControllerBoundTrapFlowInfo(actionInfo, classifierInfo, flow)
+	if err != nil {
+		// error if any, already logged in the called function
+		return
 	log.Infow("Flow ports", log.Fields{"classifierInfo_inport": classifierInfo[InPort], "action_output": actionInfo[OUTPUT]})
-	portNo, intfId, onuId, uniId := ExtractAccessFromFlow(classifierInfo[InPort].(uint32), actionInfo[OUTPUT].(uint32))
+	portNo, intfID, onuID, uniID := ExtractAccessFromFlow(classifierInfo[InPort].(uint32), actionInfo[OUTPUT].(uint32))
 	if ipProto, ok := classifierInfo[IPProto]; ok {
 		if ipProto.(uint32) == IPProtoDhcp {
 			if udpSrc, ok := classifierInfo[UDPSrc]; ok {
@@ -1418,14 +1229,14 @@
 	TpID := utils.GetTechProfileIDFromWriteMetaData(metadata)
-	kvstoreTpId := f.resourceMgr.GetTechProfileIdForOnu(intfId, onuId, uniId)
-	if kvstoreTpId == 0 {
+	kvstoreTpID := f.resourceMgr.GetTechProfileIDForOnu(intfID, onuID, uniID)
+	if kvstoreTpID == 0 {
 		log.Debugf("tpid-not-present-in-kvstore, using tp id %d from flow metadata", TpID)
-	} else if kvstoreTpId != uint32(TpID) {
-		log.Error(" Tech-profile-updates-not-supported", log.Fields{"Tpid-in-flow": TpID, "kvstore-TpId": kvstoreTpId})
+	} else if kvstoreTpID != uint32(TpID) {
+		log.Error(" Tech-profile-updates-not-supported", log.Fields{"Tpid-in-flow": TpID, "kvstore-TpId": kvstoreTpID})
-	log.Debugw("TPID for this subcriber", log.Fields{"TpId": TpID, "pon": intfId, "onuId": onuId, "uniId": uniId})
+	log.Debugw("TPID for this subcriber", log.Fields{"TpId": TpID, "pon": intfID, "onuID": onuID, "uniID": uniID})
 	if IsUpstream(actionInfo[OUTPUT].(uint32)) {
 		UsMeterID = utils.GetMeterIdFromFlow(flow)
 		log.Debugw("Upstream-flow-meter-id", log.Fields{"UsMeterID": UsMeterID})
@@ -1434,7 +1245,7 @@
 		log.Debugw("Downstream-flow-meter-id", log.Fields{"DsMeterID": DsMeterID})
-	f.divideAndAddFlow(intfId, onuId, uniId, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+	f.divideAndAddFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
 //sendTPDownloadMsgToChild send payload
@@ -1561,13 +1372,13 @@
 	logicalFlow *ofp.OfpFlowStats,
 	gemPorts []uint32,
 	FlowType string,
-	vlanId ...uint32) {
-	log.Debugw("Installing flow on all GEM ports", log.Fields{"FlowType": FlowType, "gemPorts": gemPorts, "vlan": vlanId})
-	for _, gemPortId := range gemPorts {
+	vlanID ...uint32) {
+	log.Debugw("Installing flow on all GEM ports", log.Fields{"FlowType": FlowType, "gemPorts": gemPorts, "vlan": vlanID})
+	for _, gemPortID := range gemPorts {
 		if FlowType == HsiaFlow || FlowType == DhcpFlow {
-			f1(args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortId)
+			f1(args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID)
 		} else if FlowType == EapolFlow {
-			f2(args["intfId"], args["onuId"], args["uniId"], args["portNo"], logicalFlow, args["allocId"], gemPortId, vlanId[0])
+			f2(args["intfId"], args["onuId"], args["uniId"], args["portNo"], logicalFlow, args["allocId"], gemPortID, vlanID[0])
 		} else {
 			log.Errorw("Unrecognized Flow Type", log.Fields{"FlowType": FlowType})
@@ -1643,3 +1454,232 @@
+func verifyMeterIDAndGetDirection(MeterID uint32, Dir tp_pb.Direction) (string, error) {
+	if MeterID == 0 { // This should never happen
+		log.Error("Invalid meter id")
+		return "", errors.New("invalid meter id")
+	}
+	if Dir == tp_pb.Direction_UPSTREAM {
+		return "upstream", nil
+	} else if Dir == tp_pb.Direction_DOWNSTREAM {
+		return "downstream", nil
+	}
+	return "", nil
+func (f *OpenOltFlowMgr) checkAndAddFlow(args map[string]uint32, classifierInfo map[string]interface{},
+	actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, gemPort, intfID, onuID, uniID, portNo uint32,
+	TpInst *tp.TechProfile, allocID []uint32, gemPorts []uint32, TpID uint32, uni string) {
+	if ipProto, ok := classifierInfo[IPProto]; ok {
+		if ipProto.(uint32) == IPProtoDhcp {
+			log.Info("Adding DHCP flow")
+			if pcp, ok := classifierInfo[VlanPcp]; ok {
+				gemPort = f.techprofile[intfID].GetGemportIDForPbit(TpInst,
+					tp_pb.Direction_UPSTREAM,
+					pcp.(uint32))
+				//Adding DHCP upstream flow
+				f.addDHCPTrapFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID[0], gemPort)
+			} else {
+				//Adding DHCP upstream flow to all gemports
+				installFlowOnAllGemports(f.addDHCPTrapFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, DhcpFlow)
+			}
+		} else if ipProto == IgmpProto {
+			log.Info("igmp flow add ignored, not implemented yet")
+			return
+		} else {
+			log.Errorw("Invalid-Classifier-to-handle", log.Fields{"classifier": classifierInfo, "action": actionInfo})
+			return
+		}
+	} else if ethType, ok := classifierInfo[EthType]; ok {
+		if ethType.(uint32) == EapEthType {
+			log.Info("Adding EAPOL flow")
+			var vlanID uint32
+			if val, ok := classifierInfo[VlanVid]; ok {
+				vlanID = (val.(uint32)) & VlanvIDMask
+			} else {
+				vlanID = DefaultMgmtVlan
+			}
+			if pcp, ok := classifierInfo[VlanPcp]; ok {
+				gemPort = f.techprofile[intfID].GetGemportIDForPbit(TpInst,
+					tp_pb.Direction_UPSTREAM,
+					pcp.(uint32))
+				f.addEAPOLFlow(intfID, onuID, uniID, portNo, flow, allocID[0], gemPort, vlanID)
+			} else {
+				installFlowOnAllGemports(nil, f.addEAPOLFlow, args, classifierInfo, actionInfo, flow, gemPorts, EapolFlow, vlanID)
+			}
+		}
+		if ethType == LldpEthType {
+			log.Info("Adding LLDP flow")
+			addLLDPFlow(flow, portNo)
+			return
+		}
+	} else if _, ok := actionInfo[PushVlan]; ok {
+		log.Info("Adding upstream data rule")
+		if pcp, ok := classifierInfo[VlanPcp]; ok {
+			gemPort = f.techprofile[intfID].GetGemportIDForPbit(TpInst,
+				tp_pb.Direction_UPSTREAM,
+				pcp.(uint32))
+			//Adding HSIA upstream flow
+			f.addUpstreamDataFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID[0], gemPort)
+		} else {
+			//Adding HSIA upstream flow to all gemports
+			installFlowOnAllGemports(f.addUpstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, HsiaFlow)
+		}
+	} else if _, ok := actionInfo[PopVlan]; ok {
+		log.Info("Adding Downstream data rule")
+		if pcp, ok := classifierInfo[VlanPcp]; ok {
+			gemPort = f.techprofile[intfID].GetGemportIDForPbit(TpInst,
+				tp_pb.Direction_UPSTREAM,
+				pcp.(uint32))
+			//Adding HSIA downstream flow
+			f.addDownstreamDataFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID[0], gemPort)
+		} else {
+			//Adding HSIA downstream flow to all gemports
+			installFlowOnAllGemports(f.addDownstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, HsiaFlow)
+		}
+	} else {
+		log.Errorw("Invalid-flow-type-to-handle", log.Fields{"classifier": classifierInfo, "action": actionInfo, "flow": flow})
+		return
+	}
+	// Send Techprofile download event to child device in go routine as it takes time
+	go f.sendTPDownloadMsgToChild(intfID, onuID, uniID, uni, TpID)
+func formulateClassifierInfoFromFlow(classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) {
+	for _, field := range utils.GetOfbFields(flow) {
+		if field.Type == utils.ETH_TYPE {
+			classifierInfo[EthType] = field.GetEthType()
+			log.Debug("field-type-eth-type", log.Fields{"classifierInfo[ETH_TYPE]": classifierInfo[EthType].(uint32)})
+		} else if field.Type == utils.IP_PROTO {
+			classifierInfo[IPProto] = field.GetIpProto()
+			log.Debug("field-type-ip-proto", log.Fields{"classifierInfo[IP_PROTO]": classifierInfo[IPProto].(uint32)})
+		} else if field.Type == utils.IN_PORT {
+			classifierInfo[InPort] = field.GetPort()
+			log.Debug("field-type-in-port", log.Fields{"classifierInfo[IN_PORT]": classifierInfo[InPort].(uint32)})
+		} else if field.Type == utils.VLAN_VID {
+			classifierInfo[VlanVid] = field.GetVlanVid()
+			log.Debug("field-type-vlan-vid", log.Fields{"classifierInfo[VLAN_VID]": classifierInfo[VlanVid].(uint32)})
+		} else if field.Type == utils.VLAN_PCP {
+			classifierInfo[VlanPcp] = field.GetVlanPcp()
+			log.Debug("field-type-vlan-pcp", log.Fields{"classifierInfo[VLAN_PCP]": classifierInfo[VlanPcp].(uint32)})
+		} else if field.Type == utils.UDP_DST {
+			classifierInfo[UDPDst] = field.GetUdpDst()
+			log.Debug("field-type-udp-dst", log.Fields{"classifierInfo[UDP_DST]": classifierInfo[UDPDst].(uint32)})
+		} else if field.Type == utils.UDP_SRC {
+			classifierInfo[UDPSrc] = field.GetUdpSrc()
+			log.Debug("field-type-udp-src", log.Fields{"classifierInfo[UDP_SRC]": classifierInfo[UDPSrc].(uint32)})
+		} else if field.Type == utils.IPV4_DST {
+			classifierInfo[Ipv4Dst] = field.GetIpv4Dst()
+			log.Debug("field-type-ipv4-dst", log.Fields{"classifierInfo[IPV4_DST]": classifierInfo[Ipv4Dst].(uint32)})
+		} else if field.Type == utils.IPV4_SRC {
+			classifierInfo[Ipv4Src] = field.GetIpv4Src()
+			log.Debug("field-type-ipv4-src", log.Fields{"classifierInfo[IPV4_SRC]": classifierInfo[Ipv4Src].(uint32)})
+		} else if field.Type == utils.METADATA {
+			classifierInfo[METADATA] = field.GetTableMetadata()
+			log.Debug("field-type-metadata", log.Fields{"classifierInfo[METADATA]": classifierInfo[METADATA].(uint64)})
+		} else if field.Type == utils.TUNNEL_ID {
+			classifierInfo[TunnelID] = field.GetTunnelId()
+			log.Debug("field-type-tunnelId", log.Fields{"classifierInfo[TUNNEL_ID]": classifierInfo[TunnelID].(uint64)})
+		} else {
+			log.Errorw("Un supported field type", log.Fields{"type": field.Type})
+			return
+		}
+	}
+func formulateActionInfoFromFlow(actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) error {
+	for _, action := range utils.GetActions(flow) {
+		if action.Type == utils.OUTPUT {
+			if out := action.GetOutput(); out != nil {
+				actionInfo[OUTPUT] = out.GetPort()
+				log.Debugw("action-type-output", log.Fields{"out_port": actionInfo[OUTPUT].(uint32)})
+			} else {
+				log.Error("Invalid output port in action")
+				return errors.New("invalid output port in action")
+			}
+		} else if action.Type == utils.POP_VLAN {
+			actionInfo[PopVlan] = true
+			log.Debugw("action-type-pop-vlan", log.Fields{"in_port": classifierInfo[InPort].(uint32)})
+		} else if action.Type == utils.PUSH_VLAN {
+			if out := action.GetPush(); out != nil {
+				if tpid := out.GetEthertype(); tpid != 0x8100 {
+					log.Errorw("Invalid ethertype in push action", log.Fields{"ethertype": actionInfo[PushVlan].(int32)})
+				} else {
+					actionInfo[PushVlan] = true
+					actionInfo[TPID] = tpid
+					log.Debugw("action-type-push-vlan",
+						log.Fields{"push_tpid": actionInfo[TPID].(uint32), "in_port": classifierInfo[InPort].(uint32)})
+				}
+			}
+		} else if action.Type == utils.SET_FIELD {
+			if out := action.GetSetField(); out != nil {
+				if field := out.GetField(); field != nil {
+					if ofClass := field.GetOxmClass(); ofClass != ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
+						log.Errorw("Invalid openflow class", log.Fields{"class": ofClass})
+						return errors.New("invalid openflow class")
+					}
+					/*log.Debugw("action-type-set-field",log.Fields{"field": field, "in_port": classifierInfo[IN_PORT].(uint32)})*/
+					if ofbField := field.GetOfbField(); ofbField != nil {
+						if fieldtype := ofbField.GetType(); fieldtype == ofp.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID {
+							if vlan := ofbField.GetVlanVid(); vlan != 0 {
+								actionInfo[VlanVid] = vlan & 0xfff
+								log.Debugw("action-set-vlan-vid", log.Fields{"actionInfo[VLAN_VID]": actionInfo[VlanVid].(uint32)})
+							} else {
+								log.Error("No Invalid vlan id in set vlan-vid action")
+							}
+						} else {
+							log.Errorw("unsupported-action-set-field-type", log.Fields{"type": fieldtype})
+						}
+					}
+				}
+			}
+		} else {
+			log.Errorw("Un supported action type", log.Fields{"type": action.Type})
+			return errors.New("un supported action type")
+		}
+	}
+	return nil
+func formulateControllerBoundTrapFlowInfo(actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) error {
+	if isControllerFlow := IsControllerBoundFlow(actionInfo[OUTPUT].(uint32)); isControllerFlow {
+		log.Debug("Controller bound trap flows, getting inport from tunnelid")
+		/* Get UNI port/ IN Port from tunnel ID field for upstream controller bound flows  */
+		if portType := IntfIDToPortTypeName(classifierInfo[InPort].(uint32)); portType == voltha.Port_PON_OLT {
+			if uniPort := utils.GetChildPortFromTunnelId(flow); uniPort != 0 {
+				classifierInfo[InPort] = uniPort
+				log.Debugw("upstream pon-to-controller-flow,inport-in-tunnelid", log.Fields{"newInPort": classifierInfo[InPort].(uint32), "outPort": actionInfo[OUTPUT].(uint32)})
+			} else {
+				log.Error("upstream pon-to-controller-flow, NO-inport-in-tunnelid")
+				return errors.New("upstream pon-to-controller-flow, NO-inport-in-tunnelid")
+			}
+		}
+	} else {
+		log.Debug("Non-Controller flows, getting uniport from tunnelid")
+		// Downstream flow from NNI to PON port , Use tunnel ID as new OUT port / UNI port
+		if portType := IntfIDToPortTypeName(actionInfo[OUTPUT].(uint32)); portType == voltha.Port_PON_OLT {
+			if uniPort := utils.GetChildPortFromTunnelId(flow); uniPort != 0 {
+				actionInfo[OUTPUT] = uniPort
+				log.Debugw("downstream-nni-to-pon-port-flow, outport-in-tunnelid", log.Fields{"newOutPort": actionInfo[OUTPUT].(uint32), "outPort": actionInfo[OUTPUT].(uint32)})
+			} else {
+				log.Debug("downstream-nni-to-pon-port-flow, no-outport-in-tunnelid", log.Fields{"InPort": classifierInfo[InPort].(uint32), "outPort": actionInfo[OUTPUT].(uint32)})
+				return errors.New("downstream-nni-to-pon-port-flow, no-outport-in-tunnelid")
+			}
+			// Upstream flow from PON to NNI port , Use tunnel ID as new IN port / UNI port
+		} else if portType := IntfIDToPortTypeName(classifierInfo[InPort].(uint32)); portType == voltha.Port_PON_OLT {
+			if uniPort := utils.GetChildPortFromTunnelId(flow); uniPort != 0 {
+				classifierInfo[InPort] = uniPort
+				log.Debugw("upstream-pon-to-nni-port-flow, inport-in-tunnelid", log.Fields{"newInPort": actionInfo[OUTPUT].(uint32),
+					"outport": actionInfo[OUTPUT].(uint32)})
+			} else {
+				log.Debug("upstream-pon-to-nni-port-flow, no-inport-in-tunnelid", log.Fields{"InPort": classifierInfo[InPort].(uint32),
+					"outPort": actionInfo[OUTPUT].(uint32)})
+				return errors.New("upstream-pon-to-nni-port-flow, no-inport-in-tunnelid")
+			}
+		}
+	}
+	return nil
diff --git a/adaptercore/resourcemanager/resourcemanager.go b/adaptercore/resourcemanager/resourcemanager.go
index dc5300a..bb78ebd 100755
--- a/adaptercore/resourcemanager/resourcemanager.go
+++ b/adaptercore/resourcemanager/resourcemanager.go
@@ -32,14 +32,17 @@
-// KvstoreTimeout specifies the time out for KV Store Connection
-const KvstoreTimeout = 5
+const (
+	// KvstoreTimeout specifies the time out for KV Store Connection
+	KvstoreTimeout = 5
+	// BasePathKvStore - service/voltha/openolt/<device_id>
+	BasePathKvStore = "service/voltha/openolt/{%s}"
+	// TpIDPathSuffix - tp_id/<(pon_id, onu_id, uni_id)>
+	TpIDPathSuffix = "tp_id/{%d,%d,%d}"
+	//MeterIDPathSuffix - meter_id/<(pon_id, onu_id, uni_id)>/<direction>
+	MeterIDPathSuffix = "meter_id/{%d,%d,%d}/{%s}"
-// BasePathKvStore - service/voltha/openolt/<device_id>
-const BasePathKvStore = "service/voltha/openolt/{%s}"
-const TP_ID_PATH_SUFFIX = "tp_id/{%d,%d,%d}"            // tp_id/<(pon_id, onu_id, uni_id)>
-const METER_ID_PATH_SUFFIX = "meter_id/{%d,%d,%d}/{%s}" // meter_id/<(pon_id, onu_id, uni_id)>/<direction>
 // FlowInfo holds the flow information
 type FlowInfo struct {
 	Flow            *openolt.Flow
@@ -427,22 +430,9 @@
 		log.Debugw("Found flowId(s) for this ONU", log.Fields{"pon": ponIntfID, "ONUID": ONUID, "uniID": uniID, "KVpath": FlowPath})
 		for _, flowID := range FlowIDs {
 			FlowInfo := RsrcMgr.GetFlowIDInfo(ponIntfID, ONUID, uniID, uint32(flowID))
-			if FlowInfo != nil {
-				for _, Info := range *FlowInfo {
-					if int32(gemportID) == Info.Flow.GemportId && flowCategory != "" && Info.FlowCategory == flowCategory {
-						log.Debug("Found flow matching with flow catagory", log.Fields{"flowId": flowID, "FlowCategory": flowCategory})
-						if Info.FlowCategory == "HSIA_FLOW" && Info.Flow.Classifier.OPbits == vlanPcp[0] {
-							log.Debug("Found matching vlan pcp ", log.Fields{"flowId": flowID, "Vlanpcp": vlanPcp[0]})
-							return flowID, nil
-						}
-					}
-					if int32(gemportID) == Info.Flow.GemportId && flowStoreCookie != 0 && Info.FlowStoreCookie == flowStoreCookie {
-						if flowCategory != "" && Info.FlowCategory == flowCategory {
-							log.Debug("Found flow matching with flow catagory", log.Fields{"flowId": flowID, "FlowCategory": flowCategory})
-							return flowID, nil
-						}
-					}
-				}
+			er := getFlowIDFromFlowInfo(FlowInfo, flowID, gemportID, flowStoreCookie, flowCategory, vlanPcp...)
+			if er == nil {
+				return flowID, er
@@ -704,10 +694,12 @@
 	return false
-func (RMgr *OpenOltResourceMgr) GetTechProfileIdForOnu(IntfId uint32, OnuId uint32, UniId uint32) uint32 {
-	Path := fmt.Sprintf(TP_ID_PATH_SUFFIX, IntfId, OnuId, UniId)
+// GetTechProfileIDForOnu fetches Tech-Profile-ID from the KV-Store for the given onu based on the path
+// This path is formed as the following: tp_id/{IntfID, OnuID, UniID}
+func (RsrcMgr *OpenOltResourceMgr) GetTechProfileIDForOnu(IntfID uint32, OnuID uint32, UniID uint32) uint32 {
+	Path := fmt.Sprintf(TpIDPathSuffix, IntfID, OnuID, UniID)
 	var Data uint32
-	Value, err := RMgr.KVStore.Get(Path)
+	Value, err := RsrcMgr.KVStore.Get(Path)
 	if err == nil {
 		if Value != nil {
 			Val, err := kvstore.ToByte(Value.Value)
@@ -728,67 +720,75 @@
-func (RMgr *OpenOltResourceMgr) RemoveTechProfileIdForOnu(IntfId uint32, OnuId uint32, UniId uint32) error {
-	IntfOnuUniId := fmt.Sprintf(TP_ID_PATH_SUFFIX, IntfId, OnuId, UniId)
-	if err := RMgr.KVStore.Delete(IntfOnuUniId); err != nil {
-		log.Error("Failed to delete techprofile id resource %s in KV store", IntfOnuUniId)
+// RemoveTechProfileIDForOnu deletes the tech-profile-id  from the KV-Store for the given onu based on the path
+// This path is formed as the following: tp_id/{IntfID, OnuID, UniID}
+func (RsrcMgr *OpenOltResourceMgr) RemoveTechProfileIDForOnu(IntfID uint32, OnuID uint32, UniID uint32) error {
+	IntfOnuUniID := fmt.Sprintf(TpIDPathSuffix, IntfID, OnuID, UniID)
+	if err := RsrcMgr.KVStore.Delete(IntfOnuUniID); err != nil {
+		log.Error("Failed to delete techprofile id resource %s in KV store", IntfOnuUniID)
 		return err
 	return nil
-func (RMgr *OpenOltResourceMgr) UpdateTechProfileIdForOnu(IntfId uint32, OnuId uint32,
-	UniId uint32, TpId uint32) error {
+//UpdateTechProfileIDForOnu updates (put) already present tech-profile-id for the given onu based on the path
+// This path is formed as the following: tp_id/{IntfID, OnuID, UniID}
+func (RsrcMgr *OpenOltResourceMgr) UpdateTechProfileIDForOnu(IntfID uint32, OnuID uint32,
+	UniID uint32, TpID uint32) error {
 	var Value []byte
 	var err error
-	IntfOnuUniId := fmt.Sprintf(TP_ID_PATH_SUFFIX, IntfId, OnuId, UniId)
-	log.Debugf("updating tp id %d on path %s", TpId, IntfOnuUniId)
-	Value, err = json.Marshal(TpId)
+	IntfOnuUniID := fmt.Sprintf(TpIDPathSuffix, IntfID, OnuID, UniID)
+	log.Debugf("updating tp id %d on path %s", TpID, IntfOnuUniID)
+	Value, err = json.Marshal(TpID)
 	if err != nil {
 		log.Error("failed to Marshal")
 		return err
-	if err = RMgr.KVStore.Put(IntfOnuUniId, Value); err != nil {
-		log.Errorf("Failed to update resource %s", IntfOnuUniId)
+	if err = RsrcMgr.KVStore.Put(IntfOnuUniID, Value); err != nil {
+		log.Errorf("Failed to update resource %s", IntfOnuUniID)
 		return err
 	return err
-func (RMgr *OpenOltResourceMgr) UpdateMeterIdForOnu(Direction string, IntfId uint32, OnuId uint32,
-	UniId uint32, MeterConfig *ofp.OfpMeterConfig) error {
+// UpdateMeterIDForOnu updates the meter id in the KV-Store for the given onu based on the path
+// This path is formed as the following: tp_id/{IntfID, OnuID, UniID}/direction
+func (RsrcMgr *OpenOltResourceMgr) UpdateMeterIDForOnu(Direction string, IntfID uint32, OnuID uint32,
+	UniID uint32, MeterConfig *ofp.OfpMeterConfig) error {
 	var Value []byte
 	var err error
-	IntfOnuUniId := fmt.Sprintf(METER_ID_PATH_SUFFIX, IntfId, OnuId, UniId, Direction)
+	IntfOnuUniID := fmt.Sprintf(MeterIDPathSuffix, IntfID, OnuID, UniID, Direction)
 	Value, err = json.Marshal(*MeterConfig)
 	if err != nil {
 		log.Error("failed to Marshal meter config")
 		return err
-	if err = RMgr.KVStore.Put(IntfOnuUniId, Value); err != nil {
-		log.Errorf("Failed to store meter into KV store %s", IntfOnuUniId)
+	if err = RsrcMgr.KVStore.Put(IntfOnuUniID, Value); err != nil {
+		log.Errorf("Failed to store meter into KV store %s", IntfOnuUniID)
 		return err
 	return err
-func (RMgr *OpenOltResourceMgr) GetMeterIdForOnu(Direction string, IntfId uint32, OnuId uint32, UniId uint32) (*ofp.OfpMeterConfig, error) {
-	Path := fmt.Sprintf(METER_ID_PATH_SUFFIX, IntfId, OnuId, UniId, Direction)
+// GetMeterIDForOnu fetches the meter-id fromthe kv store for the given onu based on the path
+// This path is formed as the following: tp_id/{IntfID, OnuID, UniID}/direction
+func (RsrcMgr *OpenOltResourceMgr) GetMeterIDForOnu(Direction string, IntfID uint32, OnuID uint32, UniID uint32) (*ofp.OfpMeterConfig, error) {
+	Path := fmt.Sprintf(MeterIDPathSuffix, IntfID, OnuID, UniID, Direction)
 	var meterConfig ofp.OfpMeterConfig
-	Value, err := RMgr.KVStore.Get(Path)
+	Value, err := RsrcMgr.KVStore.Get(Path)
 	if err == nil {
 		if Value != nil {
 			log.Debug("Found meter in KV store", log.Fields{"Direction": Direction})
-			Val, err := kvstore.ToByte(Value.Value)
-			if err != nil {
-				log.Errorw("Failed to convert into byte array", log.Fields{"error": err})
-				return nil, err
+			Val, er := kvstore.ToByte(Value.Value)
+			if er != nil {
+				log.Errorw("Failed to convert into byte array", log.Fields{"error": er})
+				return nil, er
-			if err = json.Unmarshal(Val, &meterConfig); err != nil {
-				log.Error("Failed to unmarshal meterconfig", log.Fields{"error": err})
-				return nil, err
+			if er = json.Unmarshal(Val, &meterConfig); er != nil {
+				log.Error("Failed to unmarshal meterconfig", log.Fields{"error": er})
+				return nil, er
 		} else {
@@ -801,11 +801,35 @@
 	return &meterConfig, err
-func (RMgr *OpenOltResourceMgr) RemoveMeterIdForOnu(Direction string, IntfId uint32, OnuId uint32, UniId uint32) error {
-	Path := fmt.Sprintf(METER_ID_PATH_SUFFIX, IntfId, OnuId, UniId, Direction)
-	if err := RMgr.KVStore.Delete(Path); err != nil {
+// RemoveMeterIDForOnu deletes the meter-id from the kV-Store for the given onu based on the path
+// This path is formed as the following: tp_id/{IntfID, OnuID, UniID}/direction
+func (RsrcMgr *OpenOltResourceMgr) RemoveMeterIDForOnu(Direction string, IntfID uint32, OnuID uint32, UniID uint32) error {
+	Path := fmt.Sprintf(MeterIDPathSuffix, IntfID, OnuID, UniID, Direction)
+	if err := RsrcMgr.KVStore.Delete(Path); err != nil {
 		log.Errorf("Failed to delete meter id %s from kvstore ", Path)
 		return err
 	return nil
+func getFlowIDFromFlowInfo(FlowInfo *[]FlowInfo, flowID, gemportID uint32, flowStoreCookie uint64, flowCategory string, vlanPcp ...uint32) error {
+	if FlowInfo != nil {
+		for _, Info := range *FlowInfo {
+			if int32(gemportID) == Info.Flow.GemportId && flowCategory != "" && Info.FlowCategory == flowCategory {
+				log.Debug("Found flow matching with flow category", log.Fields{"flowId": flowID, "FlowCategory": flowCategory})
+				if Info.FlowCategory == "HSIA_FLOW" && Info.Flow.Classifier.OPbits == vlanPcp[0] {
+					log.Debug("Found matching vlan pcp ", log.Fields{"flowId": flowID, "Vlanpcp": vlanPcp[0]})
+					return nil
+				}
+			}
+			if int32(gemportID) == Info.Flow.GemportId && flowStoreCookie != 0 && Info.FlowStoreCookie == flowStoreCookie {
+				if flowCategory != "" && Info.FlowCategory == flowCategory {
+					log.Debug("Found flow matching with flow category", log.Fields{"flowId": flowID, "FlowCategory": flowCategory})
+					return nil
+				}
+			}
+		}
+	}
+	log.Errorw("invalid flow-info", log.Fields{"flow_info": FlowInfo})
+	return errors.New("invalid flow-info")