[VOL-5255] Use all uplink interfaces not only interface 0 at OLT

Raising new MR after addressing comments from the original MR.
Link to original MR:
https://gerrit.opencord.org/c/voltha-openolt-adapter/+/34993

Change-Id: Ic2b1589db87755ca62f656e18bc5e314c3e85cd8
Signed-off-by: Sridhar Ravindra <sridhar.ravindra@radisys.com>
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 7389cb7..5b4a474 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -164,6 +164,7 @@
 	flowMetadata *ofp.FlowMetadata
 	direction    tp_pb.Direction
 	intfID       uint32
+	nniIntfID    uint32
 	onuID        uint32
 	uniID        uint32
 	tpID         uint32
@@ -283,7 +284,7 @@
 	return nil
 }
 
-func (f *OpenOltFlowMgr) processAddFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
+func (f *OpenOltFlowMgr) processAddFlow(ctx context.Context, intfID uint32, nni_port uint32, onuID uint32, uniID uint32, portNo uint32,
 	classifierInfo map[string]interface{}, actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, TpID uint32,
 	UsMeterID uint32, DsMeterID uint32, flowMetadata *ofp.FlowMetadata) error {
 	var allocID uint32
@@ -332,7 +333,7 @@
 		"usmeter-id": UsMeterID,
 		"dsmeter-id": DsMeterID,
 		"tp-id":      TpID})
-	allocID, gemPorts, TpInst = f.createTcontGemports(ctx, intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
+	allocID, gemPorts, TpInst = f.createTcontGemports(ctx, intfID, nni_port, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
 	if allocID == 0 || gemPorts == nil || TpInst == nil {
 		logger.Error(ctx, "alloc-id-gem-ports-tp-unavailable")
 		return olterrors.NewErrNotFound(
@@ -358,6 +359,7 @@
 	logger.Debugw(ctx, "CreateSchedulerQueues",
 		log.Fields{"dir": sq.direction,
 			"intf-id":      sq.intfID,
+			"nniIntfID":    sq.nniIntfID,
 			"onu-id":       sq.onuID,
 			"uni-id":       sq.uniID,
 			"tp-id":        sq.tpID,
@@ -512,7 +514,7 @@
 		log.Fields{"direction": sq.direction,
 			"traffic-queues": trafficQueues,
 			"device-id":      f.deviceHandler.device.Id})
-	queues := &tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
+	queues := &tp_pb.TrafficQueues{IntfId: sq.intfID, NetworkIntfId: sq.nniIntfID, OnuId: sq.onuID,
 		UniId: sq.uniID, PortNo: sq.uniPort,
 		TrafficQueues: trafficQueues,
 		TechProfileId: TrafficSched[0].TechProfileId}
@@ -543,6 +545,7 @@
 				"TrafficScheds": TrafficSched,
 				"device-id":     f.deviceHandler.device.Id,
 				"intfID":        sq.intfID,
+				"nniIntfID":     sq.nniIntfID,
 				"onuID":         sq.onuID,
 				"uniID":         sq.uniID})
 		if _, err := f.deviceHandler.Client.CreateTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
@@ -564,7 +567,8 @@
 			"traffic-queues": trafficQueues,
 			"device-id":      f.deviceHandler.device.Id})
 	queues := &tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
-		UniId: sq.uniID, PortNo: sq.uniPort,
+		NetworkIntfId: sq.nniIntfID,
+		UniId:         sq.uniID, PortNo: sq.uniPort,
 		TrafficQueues: trafficQueues,
 		TechProfileId: TrafficSched[0].TechProfileId}
 	if _, err := f.deviceHandler.Client.CreateTrafficQueues(ctx, queues); err != nil {
@@ -613,6 +617,7 @@
 		log.Fields{
 			"direction": sq.direction,
 			"intf-id":   sq.intfID,
+			"nniIntfID": sq.nniIntfID,
 			"onu-id":    sq.onuID,
 			"uni-id":    sq.uniID,
 			"uni-port":  sq.uniPort,
@@ -629,7 +634,8 @@
 
 	if _, err = f.deviceHandler.Client.RemoveTrafficQueues(ctx,
 		&tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
-			UniId: sq.uniID, PortNo: sq.uniPort,
+			NetworkIntfId: sq.nniIntfID,
+			UniId:         sq.uniID, PortNo: sq.uniPort,
 			TrafficQueues: TrafficQueues,
 			TechProfileId: sq.tpID}); err != nil {
 		return olterrors.NewErrAdapter("unable-to-remove-traffic-queues-from-device",
@@ -799,7 +805,7 @@
 }
 
 // This function allocates tconts and GEM ports for an ONU
-func (f *OpenOltFlowMgr) createTcontGemports(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uni string, uniPort uint32, TpID uint32, UsMeterID uint32, DsMeterID uint32, flowMetadata *ofp.FlowMetadata) (uint32, []uint32, interface{}) {
+func (f *OpenOltFlowMgr) createTcontGemports(ctx context.Context, intfID uint32, nniIntfID uint32, onuID uint32, uniID uint32, uni string, uniPort uint32, TpID uint32, UsMeterID uint32, DsMeterID uint32, flowMetadata *ofp.FlowMetadata) (uint32, []uint32, interface{}) {
 	var allocIDs []uint32
 	var allgemPortIDs []uint32
 	var gemPortIDs []uint32
@@ -847,7 +853,7 @@
 	switch tpInst := techProfileInstance.(type) {
 	case *tp_pb.TechProfileInstance:
 		if UsMeterID != 0 {
-			sq := schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: TpID,
+			sq := schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, nniIntfID: nniIntfID, onuID: onuID, uniID: uniID, tpID: TpID,
 				uniPort: uniPort, tpInst: techProfileInstance, meterID: UsMeterID, flowMetadata: flowMetadata}
 			if err := f.CreateSchedulerQueues(ctx, sq); err != nil {
 				logger.Errorw(ctx, "CreateSchedulerQueues-failed-upstream",
@@ -863,7 +869,7 @@
 			}
 		}
 		if DsMeterID != 0 {
-			sq := schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: TpID,
+			sq := schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: intfID, nniIntfID: nniIntfID, onuID: onuID, uniID: uniID, tpID: TpID,
 				uniPort: uniPort, tpInst: techProfileInstance, meterID: DsMeterID, flowMetadata: flowMetadata}
 			if err := f.CreateSchedulerQueues(ctx, sq); err != nil {
 				logger.Errorw(ctx, "CreateSchedulerQueues-failed-downstream",
@@ -1907,7 +1913,7 @@
 // clearResources clears pon resources in kv store and the device
 // nolint: gocyclo
 func (f *OpenOltFlowMgr) clearResources(ctx context.Context, intfID uint32, onuID int32, uniID int32,
-	flowID uint64, portNum uint32, tpID uint32, sendDeleteGemRequest bool) error {
+	flowID uint64, portNum uint32, tpID uint32, sendDeleteGemRequest bool, nniIntfID uint32) error {
 	logger.Debugw(ctx, "clearing-resources", log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID, "tpID": tpID})
 
 	uni := getUniPortPath(f.deviceHandler.device.Id, intfID, onuID, uniID)
@@ -1996,7 +2002,7 @@
 			}
 		}
 		// Remove queues at OLT in upstream and downstream direction
-		schedQ := schedQueue{tpInst: techprofileInst, direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum}
+		schedQ := schedQueue{tpInst: techprofileInst, direction: tp_pb.Direction_UPSTREAM, intfID: intfID, nniIntfID: nniIntfID, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum}
 		if err := f.RemoveQueues(ctx, schedQ); err != nil {
 			logger.Warn(ctx, err)
 		}
@@ -2095,7 +2101,7 @@
 }
 
 // nolint: gocyclo
-func (f *OpenOltFlowMgr) clearFlowFromDeviceAndResourceManager(ctx context.Context, flow *ofp.OfpFlowStats, flowDirection string) error {
+func (f *OpenOltFlowMgr) clearFlowFromDeviceAndResourceManager(ctx context.Context, flow *ofp.OfpFlowStats, flowDirection string, nni_port uint32) error {
 	logger.Infow(ctx, "clear-flow-from-resource-manager",
 		log.Fields{
 			"flowDirection": flowDirection,
@@ -2159,7 +2165,7 @@
 		return err
 	}
 
-	if err = f.clearResources(ctx, Intf, onuID, uniID, flow.Id, portNum, tpID, true); err != nil {
+	if err = f.clearResources(ctx, Intf, onuID, uniID, flow.Id, portNum, tpID, true, nni_port); err != nil {
 		logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
 			"flow-id":   flow.Id,
 			"device-id": f.deviceHandler.device.Id,
@@ -2197,15 +2203,25 @@
 
 	if flows.HasGroup(flow) {
 		direction = Multicast
-		return f.clearFlowFromDeviceAndResourceManager(ctx, flow, direction)
 	} else if plt.IsUpstream(actionInfo[Output].(uint32)) {
 		direction = Upstream
 	} else {
 		direction = Downstream
 	}
 
+	var nni_port uint32
+	if direction == Upstream {
+		if !plt.IsControllerBoundFlow(actionInfo[Output].(uint32)) {
+			nni_port = actionInfo[Output].(uint32) & 0x1f // convert e.g. 16777220 to port 4
+		}
+	} else {
+		classifierInfo := make(map[string]interface{})
+		formulateClassifierInfoFromFlow(ctx, classifierInfo, flow)
+		nni_port = classifierInfo[InPort].(uint32) & 0x1f // convert e.g. 16777220 to port 4
+	}
+
 	// Serialize flow removes on a per subscriber basis
-	err := f.clearFlowFromDeviceAndResourceManager(ctx, flow, direction)
+	err := f.clearFlowFromDeviceAndResourceManager(ctx, flow, direction, nni_port)
 
 	return err
 }
@@ -2422,7 +2438,17 @@
 			return err
 		}
 	}
-	return f.processAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, tpID, UsMeterID, DsMeterID, flowMetadata)
+
+	var nni_port uint32
+	if plt.IsUpstream(actionInfo[Output].(uint32)) {
+		if !plt.IsControllerBoundFlow(actionInfo[Output].(uint32)) {
+			nni_port = actionInfo[Output].(uint32) & 0x1f // convert e.g. 16777220 to port 4
+		}
+	} else {
+		nni_port = classifierInfo[InPort].(uint32) & 0x1f // convert e.g. 16777220 to port 4
+	}
+
+	return f.processAddFlow(ctx, intfID, nni_port, onuID, uniID, portNo, classifierInfo, actionInfo, flow, tpID, UsMeterID, DsMeterID, flowMetadata)
 }
 
 // handleFlowWithGroup adds multicast flow to the device.
@@ -2859,6 +2885,15 @@
 
 	flowContext := &flowContext{classifierInfo, actionInfo, flow, pbitToGem, gemToAes, intfID, onuID, uniID, portNo, allocID, gemPortID, tpID}
 
+	var nni_port uint32
+	if plt.IsUpstream(actionInfo[Output].(uint32)) {
+		if !plt.IsControllerBoundFlow(actionInfo[Output].(uint32)) {
+			nni_port = actionInfo[Output].(uint32) & 0x1f // convert e.g. 16777220 to port 4 (starting with 0)
+		}
+	} else {
+		nni_port = classifierInfo[InPort].(uint32) & 0x1f // convert e.g. 16777220 to port 4 (starting with 0)
+	}
+
 	if ipProto, ok := classifierInfo[IPProto]; ok {
 		if ipProto.(uint32) == IPProtoDhcp {
 			logger.Infow(ctx, "adding-dhcp-flow", log.Fields{
@@ -2872,7 +2907,7 @@
 			if err := f.addDHCPTrapFlow(ctx, flowContext); err != nil {
 				logger.Warn(ctx, err)
 				logger.Errorw(ctx, "reverting-scheduler-and-queue-for-onu", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID, "flow-id": flow.Id, "tp-id": tpID})
-				_ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false)
+				_ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false, nni_port)
 				return err
 			}
 		} else if ipProto.(uint32) == IgmpProto {
@@ -2885,7 +2920,7 @@
 			if err := f.addIGMPTrapFlow(ctx, flowContext); err != nil {
 				logger.Warn(ctx, err)
 				logger.Errorw(ctx, "reverting-scheduler-and-queue-for-onu", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID, "flow-id": flow.Id, "tp-id": tpID})
-				_ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false)
+				_ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false, nni_port)
 				return err
 			}
 		} else {
@@ -2909,7 +2944,7 @@
 			if err := f.addEthTypeBasedFlow(ctx, flowContext, vlanID, ethType.(uint32)); err != nil {
 				logger.Warn(ctx, err)
 				logger.Errorw(ctx, "reverting-scheduler-and-queue-for-onu", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID, "flow-id": flow.Id, "tp-id": tpID})
-				_ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false)
+				_ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false, nni_port)
 				return err
 			}
 		} else if ethType.(uint32) == PPPoEDEthType {
@@ -2924,7 +2959,7 @@
 			if err := f.addUpstreamTrapFlow(ctx, flowContext); err != nil {
 				logger.Warn(ctx, err)
 				logger.Errorw(ctx, "reverting-scheduler-and-queue-for-onu", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID, "flow-id": flow.Id, "tp-id": tpID})
-				_ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false)
+				_ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false, nni_port)
 				return err
 			}
 		}
@@ -2938,7 +2973,7 @@
 		if err := f.addUpstreamDataPathFlow(ctx, flowContext); err != nil {
 			logger.Warn(ctx, err)
 			logger.Errorw(ctx, "reverting-scheduler-and-queue-for-onu", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID, "flow-id": flow.Id, "tp-id": tpID})
-			_ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false)
+			_ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false, nni_port)
 			return err
 		}
 	} else if direction == tp_pb.Direction_DOWNSTREAM {
@@ -2951,7 +2986,7 @@
 		if err := f.addDownstreamDataPathFlow(ctx, flowContext); err != nil {
 			logger.Warn(ctx, err)
 			logger.Errorw(ctx, "reverting-scheduler-and-queue-for-onu", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID, "flow-id": flow.Id, "tp-id": tpID})
-			_ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false)
+			_ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false, nni_port)
 			return err
 		}
 	} else {
@@ -3234,6 +3269,24 @@
 		}
 		logger.Infow(ctx, "input-nni-intfId-is", log.Fields{"intf-id": intfID})
 		return intfID, nil
+	} else if portType == voltha.Port_ETHERNET_UNI {
+		if _, ok := action[Output]; ok {
+			intfID, err := plt.IntfIDFromNniPortNum(ctx, action[Output].(uint32))
+			if err != nil {
+				logger.Debugw(ctx, "invalid-action-port-number",
+					log.Fields{
+						"port-number": action[Output].(uint32),
+						"err":         err})
+				return uint32(0), nil
+			}
+			logger.Infow(ctx, "output-nni-intfId-is", log.Fields{"intf-id": intfID})
+			return intfID, nil
+		} else {
+			logger.Debugw(ctx, "action-port-number-empty",
+				log.Fields{
+					"action": action})
+			return uint32(0), nil
+		}
 	}
 	return uint32(0), nil
 }
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index ca64bdd..0fb9fa1 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -85,30 +85,30 @@
 		wantErr    bool
 	}{
 		// TODO: Add test cases.
-		{"CreateSchedulerQueues-1", schedQueue{tprofile, createFlowMetadata(tprofile, 1, Upstream), tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, 1}, false},
-		{"CreateSchedulerQueues-2", schedQueue{tprofile2, createFlowMetadata(tprofile2, 1, Downstream), tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, 1}, false},
-		{"CreateSchedulerQueues-13", schedQueue{tprofile, createFlowMetadata(tprofile, 2, Upstream), tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, 1}, false},
-		{"CreateSchedulerQueues-14", schedQueue{tprofile2, createFlowMetadata(tprofile2, 2, Downstream), tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, 1}, false},
-		{"CreateSchedulerQueues-15", schedQueue{tprofile, createFlowMetadata(tprofile, 3, Upstream), tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, 1}, false},
-		{"CreateSchedulerQueues-16", schedQueue{tprofile2, createFlowMetadata(tprofile2, 3, Downstream), tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, 1}, false},
-		{"CreateSchedulerQueues-17", schedQueue{tprofile, createFlowMetadata(tprofile, 4, Upstream), tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, 1}, false},
-		{"CreateSchedulerQueues-18", schedQueue{tprofile2, createFlowMetadata(tprofile2, 4, Downstream), tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, 1}, false},
-		{"CreateSchedulerQueues-19", schedQueue{tprofile, createFlowMetadata(tprofile, 5, Upstream), tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, 1}, false},
-		{"CreateSchedulerQueues-20", schedQueue{tprofile2, createFlowMetadata(tprofile2, 5, Downstream), tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, 1}, false},
+		{"CreateSchedulerQueues-1", schedQueue{tprofile, createFlowMetadata(tprofile, 1, Upstream), tp_pb.Direction_UPSTREAM, 0, 0, 1, 1, 64, 1, 1}, false},
+		{"CreateSchedulerQueues-2", schedQueue{tprofile2, createFlowMetadata(tprofile2, 1, Downstream), tp_pb.Direction_DOWNSTREAM, 0, 0, 1, 1, 65, 1, 1}, false},
+		{"CreateSchedulerQueues-13", schedQueue{tprofile, createFlowMetadata(tprofile, 2, Upstream), tp_pb.Direction_UPSTREAM, 0, 0, 1, 1, 64, 1, 1}, false},
+		{"CreateSchedulerQueues-14", schedQueue{tprofile2, createFlowMetadata(tprofile2, 2, Downstream), tp_pb.Direction_DOWNSTREAM, 0, 0, 1, 1, 65, 1, 1}, false},
+		{"CreateSchedulerQueues-15", schedQueue{tprofile, createFlowMetadata(tprofile, 3, Upstream), tp_pb.Direction_UPSTREAM, 0, 0, 1, 1, 64, 1, 1}, false},
+		{"CreateSchedulerQueues-16", schedQueue{tprofile2, createFlowMetadata(tprofile2, 3, Downstream), tp_pb.Direction_DOWNSTREAM, 0, 0, 1, 1, 65, 1, 1}, false},
+		{"CreateSchedulerQueues-17", schedQueue{tprofile, createFlowMetadata(tprofile, 4, Upstream), tp_pb.Direction_UPSTREAM, 0, 0, 1, 1, 64, 1, 1}, false},
+		{"CreateSchedulerQueues-18", schedQueue{tprofile2, createFlowMetadata(tprofile2, 4, Downstream), tp_pb.Direction_DOWNSTREAM, 0, 0, 1, 1, 65, 1, 1}, false},
+		{"CreateSchedulerQueues-19", schedQueue{tprofile, createFlowMetadata(tprofile, 5, Upstream), tp_pb.Direction_UPSTREAM, 0, 0, 1, 1, 64, 1, 1}, false},
+		{"CreateSchedulerQueues-20", schedQueue{tprofile2, createFlowMetadata(tprofile2, 5, Downstream), tp_pb.Direction_DOWNSTREAM, 0, 0, 1, 1, 65, 1, 1}, false},
 
 		// Negative testcases
-		{"CreateSchedulerQueues-1", schedQueue{tprofile, createFlowMetadata(tprofile, 0, Upstream), tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, 1}, true},
-		{"CreateSchedulerQueues-2", schedQueue{tprofile2, createFlowMetadata(tprofile2, 0, Downstream), tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, 1}, true},
-		{"CreateSchedulerQueues-3", schedQueue{tprofile, createFlowMetadata(tprofile, 2, Upstream), tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, 2}, true},
-		{"CreateSchedulerQueues-4", schedQueue{tprofile2, createFlowMetadata(tprofile2, 2, Downstream), tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, 2}, true},
-		{"CreateSchedulerQueues-5", schedQueue{tprofile, createFlowMetadata(tprofile, 3, Upstream), tp_pb.Direction_UPSTREAM, 1, 2, 2, 64, 2, 2}, true},
-		{"CreateSchedulerQueues-6", schedQueue{tprofile2, createFlowMetadata(tprofile2, 3, Downstream), tp_pb.Direction_DOWNSTREAM, 1, 2, 2, 65, 2, 2}, true},
-		{"CreateSchedulerQueues-7", schedQueue{tprofile, &ofp.FlowMetadata{}, tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, 1}, true},
-		{"CreateSchedulerQueues-8", schedQueue{tprofile, &ofp.FlowMetadata{}, tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, 0}, true},
-		{"CreateSchedulerQueues-9", schedQueue{tprofile2, &ofp.FlowMetadata{}, tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, 1}, true},
-		{"CreateSchedulerQueues-10", schedQueue{tprofile, &ofp.FlowMetadata{}, tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, 2}, true},
-		{"CreateSchedulerQueues-11", schedQueue{tprofile2, &ofp.FlowMetadata{}, tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, 2}, true},
-		{"CreateSchedulerQueues-12", schedQueue{tprofile2, nil, tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, 2}, true},
+		{"CreateSchedulerQueues-1", schedQueue{tprofile, createFlowMetadata(tprofile, 0, Upstream), tp_pb.Direction_UPSTREAM, 0, 0, 1, 1, 64, 1, 1}, true},
+		{"CreateSchedulerQueues-2", schedQueue{tprofile2, createFlowMetadata(tprofile2, 0, Downstream), tp_pb.Direction_DOWNSTREAM, 0, 0, 1, 1, 65, 1, 1}, true},
+		{"CreateSchedulerQueues-3", schedQueue{tprofile, createFlowMetadata(tprofile, 2, Upstream), tp_pb.Direction_UPSTREAM, 0, 0, 1, 1, 64, 1, 2}, true},
+		{"CreateSchedulerQueues-4", schedQueue{tprofile2, createFlowMetadata(tprofile2, 2, Downstream), tp_pb.Direction_DOWNSTREAM, 0, 0, 1, 1, 65, 1, 2}, true},
+		{"CreateSchedulerQueues-5", schedQueue{tprofile, createFlowMetadata(tprofile, 3, Upstream), tp_pb.Direction_UPSTREAM, 1, 0, 2, 2, 64, 2, 2}, true},
+		{"CreateSchedulerQueues-6", schedQueue{tprofile2, createFlowMetadata(tprofile2, 3, Downstream), tp_pb.Direction_DOWNSTREAM, 1, 0, 2, 2, 65, 2, 2}, true},
+		{"CreateSchedulerQueues-7", schedQueue{tprofile, &ofp.FlowMetadata{}, tp_pb.Direction_UPSTREAM, 0, 0, 1, 1, 64, 1, 1}, true},
+		{"CreateSchedulerQueues-8", schedQueue{tprofile, &ofp.FlowMetadata{}, tp_pb.Direction_UPSTREAM, 0, 0, 1, 1, 64, 1, 0}, true},
+		{"CreateSchedulerQueues-9", schedQueue{tprofile2, &ofp.FlowMetadata{}, tp_pb.Direction_DOWNSTREAM, 0, 0, 1, 1, 65, 1, 1}, true},
+		{"CreateSchedulerQueues-10", schedQueue{tprofile, &ofp.FlowMetadata{}, tp_pb.Direction_UPSTREAM, 0, 0, 1, 1, 64, 1, 2}, true},
+		{"CreateSchedulerQueues-11", schedQueue{tprofile2, &ofp.FlowMetadata{}, tp_pb.Direction_DOWNSTREAM, 0, 0, 1, 1, 65, 1, 2}, true},
+		{"CreateSchedulerQueues-12", schedQueue{tprofile2, nil, tp_pb.Direction_DOWNSTREAM, 0, 0, 1, 1, 65, 1, 2}, true},
 	}
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
@@ -186,10 +186,10 @@
 		schedQueue schedQueue
 		wantErr    bool
 	}{
-		{"RemoveScheduler-1", schedQueue{tprofile, nil, tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, 0}, false},
-		{"RemoveScheduler-2", schedQueue{tprofile2, nil, tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, 0}, false},
-		{"RemoveScheduler-3", schedQueue{tprofile2, nil, tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, 0}, false},
-		{"RemoveScheduler-4", schedQueue{tprofile2, nil, tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, 0}, false},
+		{"RemoveScheduler-1", schedQueue{tprofile, nil, tp_pb.Direction_UPSTREAM, 1, 0, 1, 1, 64, 1, 0}, false},
+		{"RemoveScheduler-2", schedQueue{tprofile2, nil, tp_pb.Direction_DOWNSTREAM, 1, 0, 1, 1, 65, 1, 0}, false},
+		{"RemoveScheduler-3", schedQueue{tprofile2, nil, tp_pb.Direction_DOWNSTREAM, 1, 0, 1, 1, 65, 1, 0}, false},
+		{"RemoveScheduler-4", schedQueue{tprofile2, nil, tp_pb.Direction_DOWNSTREAM, 1, 0, 1, 1, 65, 1, 0}, false},
 	}
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
@@ -250,10 +250,10 @@
 		schedQueue schedQueue
 		wantErr    bool
 	}{
-		{"RemoveQueues-1", schedQueue{tprofile, nil, tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, 0}, false},
-		{"RemoveQueues-2", schedQueue{tprofile2, nil, tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, 0}, false},
-		{"RemoveQueues-3", schedQueue{tprofile2, nil, tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, 0}, false},
-		{"RemoveQueues-4", schedQueue{tprofile2, nil, tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, 0}, false},
+		{"RemoveQueues-1", schedQueue{tprofile, nil, tp_pb.Direction_UPSTREAM, 1, 0, 1, 1, 64, 1, 0}, false},
+		{"RemoveQueues-2", schedQueue{tprofile2, nil, tp_pb.Direction_DOWNSTREAM, 1, 0, 1, 1, 65, 1, 0}, false},
+		{"RemoveQueues-3", schedQueue{tprofile2, nil, tp_pb.Direction_DOWNSTREAM, 1, 0, 1, 1, 65, 1, 0}, false},
+		{"RemoveQueues-4", schedQueue{tprofile2, nil, tp_pb.Direction_DOWNSTREAM, 1, 0, 1, 1, 65, 1, 0}, false},
 	}
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
@@ -276,6 +276,7 @@
 	}
 	type args struct {
 		intfID       uint32
+		nniIntfID    uint32
 		onuID        uint32
 		uniID        uint32
 		uni          string
@@ -295,7 +296,7 @@
 	defer cancel()
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			_, _, tpInst := flowMgr[tt.args.intfID].createTcontGemports(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID, tt.args.uni, tt.args.uniPort, tt.args.TpID, tt.args.UsMeterID, tt.args.DsMeterID, tt.args.flowMetadata)
+			_, _, tpInst := flowMgr[tt.args.intfID].createTcontGemports(ctx, tt.args.intfID, tt.args.nniIntfID, tt.args.onuID, tt.args.uniID, tt.args.uni, tt.args.uniPort, tt.args.TpID, tt.args.UsMeterID, tt.args.DsMeterID, tt.args.flowMetadata)
 			switch tpInst := tpInst.(type) {
 			case *tp_pb.TechProfileInstance:
 				if tt.args.TpID != 64 {
diff --git a/internal/pkg/core/statsmanager.go b/internal/pkg/core/statsmanager.go
index 8cbc243..d978ee9 100644
--- a/internal/pkg/core/statsmanager.go
+++ b/internal/pkg/core/statsmanager.go
@@ -284,7 +284,8 @@
 	// Northbound and Southbound ports
 	// added to initialize the pm_metrics
 	var Ports interface{}
-	Ports, _ = InitPorts(ctx, "nni", Dev.device.Id, 1)
+	NumNniPorts := Dev.resourceMgr[0].DevInfo.GetNniPorts()
+	Ports, _ = InitPorts(ctx, "nni", Dev.device.Id, NumNniPorts)
 	StatMgr.NorthBoundPort, _ = Ports.(map[uint32]*NniPort)
 	NumPonPorts := Dev.resourceMgr[0].DevInfo.GetPonPorts()
 	Ports, _ = InitPorts(ctx, "pon", Dev.device.Id, NumPonPorts)