VOL-2296 Install TrafficDescriptor ME to provide upstream rate-limiting at ONU and set traffic descriptor info into Gem Port Network CTP ME. Also, These rate-limiting value gets meter bands.

Change-Id: Ib6189d5b1e25734fff1702d3dfa16736ad0b1377
diff --git a/internal/pkg/onuadaptercore/device_handler.go b/internal/pkg/onuadaptercore/device_handler.go
index e5444d4..b06c0e7 100644
--- a/internal/pkg/onuadaptercore/device_handler.go
+++ b/internal/pkg/onuadaptercore/device_handler.go
@@ -636,8 +636,7 @@
 func (dh *deviceHandler) FlowUpdateIncremental(ctx context.Context,
 	apOfFlowChanges *openflow_13.FlowChanges,
 	apOfGroupChanges *openflow_13.FlowGroupChanges, apFlowMetaData *voltha.FlowMetadata) error {
-	logger.Debugw(ctx, "FlowUpdateIncremental started", log.Fields{"device-id": dh.deviceID})
-
+	logger.Debugw(ctx, "FlowUpdateIncremental started", log.Fields{"device-id": dh.deviceID, "metadata": apFlowMetaData})
 	var retError error = nil
 	//Remove flows (always remove flows first - remove old and add new with same cookie may be part of the same request)
 	if apOfFlowChanges.ToRemove != nil {
@@ -737,7 +736,7 @@
 				logger.Debugw(ctx, "flow-add port indications", log.Fields{
 					"device-id": dh.deviceID, "inPort": flowInPort, "outPort": flowOutPort,
 					"uniPortName": loUniPort.name})
-				err := dh.addFlowItemToUniPort(ctx, flowItem, loUniPort)
+				err := dh.addFlowItemToUniPort(ctx, flowItem, loUniPort, apFlowMetaData)
 				//try next flow after processing error
 				if err != nil {
 					logger.Warnw(ctx, "flow-add processing error: continuing on checking further flows",
@@ -1012,7 +1011,7 @@
 			if _, exist = dh.UniVlanConfigFsmMap[uniData.PersUniID]; exist {
 				if err := dh.UniVlanConfigFsmMap[uniData.PersUniID].SetUniFlowParams(ctx, flowData.VlanRuleParams.TpID,
 					flowData.CookieSlice, uint16(flowData.VlanRuleParams.MatchVid), uint16(flowData.VlanRuleParams.SetVid),
-					uint8(flowData.VlanRuleParams.SetPcp), lastFlowToReconcile); err != nil {
+					uint8(flowData.VlanRuleParams.SetPcp), lastFlowToReconcile, flowData.Meter); err != nil {
 					logger.Errorw(ctx, err.Error(), log.Fields{"device-id": dh.deviceID})
 				}
 				dh.lockVlanConfig.RUnlock()
@@ -1020,7 +1019,7 @@
 				dh.lockVlanConfig.RUnlock()
 				if err := dh.createVlanFilterFsm(ctx, uniPort, flowData.VlanRuleParams.TpID, flowData.CookieSlice,
 					uint16(flowData.VlanRuleParams.MatchVid), uint16(flowData.VlanRuleParams.SetVid),
-					uint8(flowData.VlanRuleParams.SetPcp), OmciVlanFilterAddDone, lastFlowToReconcile); err != nil {
+					uint8(flowData.VlanRuleParams.SetPcp), OmciVlanFilterAddDone, lastFlowToReconcile, flowData.Meter); err != nil {
 					logger.Errorw(ctx, err.Error(), log.Fields{"device-id": dh.deviceID})
 				}
 			}
@@ -2890,7 +2889,8 @@
 }
 
 //addFlowItemToUniPort parses the actual flow item to add it to the UniPort
-func (dh *deviceHandler) addFlowItemToUniPort(ctx context.Context, apFlowItem *ofp.OfpFlowStats, apUniPort *onuUniPort) error {
+func (dh *deviceHandler) addFlowItemToUniPort(ctx context.Context, apFlowItem *ofp.OfpFlowStats, apUniPort *onuUniPort,
+	apFlowMetaData *voltha.FlowMetadata) error {
 	var loSetVlan uint16 = uint16(of.OfpVlanId_OFPVID_NONE)      //noValidEntry
 	var loMatchVlan uint16 = uint16(of.OfpVlanId_OFPVID_PRESENT) //reserved VLANID entry
 	var loAddPcp, loSetPcp uint8
@@ -2952,16 +2952,20 @@
 
 	//mutex protection as the update_flow rpc maybe running concurrently for different flows, perhaps also activities
 	dh.lockVlanConfig.RLock()
-	logger.Debugw(ctx, "flow-add got lock", log.Fields{"device-id": dh.deviceID})
+	logger.Debugw(ctx, "flow-add got lock", log.Fields{"device-id": dh.deviceID, "tpID": loTpID, "uniID": apUniPort.uniID})
+	var meter *voltha.OfpMeterConfig
+	if apFlowMetaData != nil {
+		meter = apFlowMetaData.Meters[0]
+	}
 	if _, exist := dh.UniVlanConfigFsmMap[apUniPort.uniID]; exist {
 		err := dh.UniVlanConfigFsmMap[apUniPort.uniID].SetUniFlowParams(ctx, loTpID, loCookieSlice,
-			loMatchVlan, loSetVlan, loSetPcp, false)
+			loMatchVlan, loSetVlan, loSetPcp, false, meter)
 		dh.lockVlanConfig.RUnlock()
 		return err
 	}
 	dh.lockVlanConfig.RUnlock()
 	return dh.createVlanFilterFsm(ctx, apUniPort, loTpID, loCookieSlice,
-		loMatchVlan, loSetVlan, loSetPcp, OmciVlanFilterAddDone, false)
+		loMatchVlan, loSetVlan, loSetPcp, OmciVlanFilterAddDone, false, meter)
 }
 
 //removeFlowItemFromUniPort parses the actual flow item to remove it from the UniPort
@@ -3011,7 +3015,7 @@
 // createVlanFilterFsm initializes and runs the VlanFilter FSM to transfer OMCI related VLAN config
 // if this function is called from possibly concurrent processes it must be mutex-protected from the caller!
 func (dh *deviceHandler) createVlanFilterFsm(ctx context.Context, apUniPort *onuUniPort, aTpID uint8, aCookieSlice []uint64,
-	aMatchVlan uint16, aSetVlan uint16, aSetPcp uint8, aDevEvent OnuDeviceEvent, lastFlowToReconcile bool) error {
+	aMatchVlan uint16, aSetVlan uint16, aSetPcp uint8, aDevEvent OnuDeviceEvent, lastFlowToReconcile bool, aMeter *voltha.OfpMeterConfig) error {
 	chVlanFilterFsm := make(chan Message, 2048)
 
 	pDevEntry := dh.getOnuDeviceEntry(ctx, true)
@@ -3022,7 +3026,7 @@
 
 	pVlanFilterFsm := NewUniVlanConfigFsm(ctx, dh, pDevEntry.PDevOmciCC, apUniPort, dh.pOnuTP,
 		pDevEntry.pOnuDB, aTpID, aDevEvent, "UniVlanConfigFsm", chVlanFilterFsm,
-		dh.pOpenOnuAc.AcceptIncrementalEvto, aCookieSlice, aMatchVlan, aSetVlan, aSetPcp, lastFlowToReconcile)
+		dh.pOpenOnuAc.AcceptIncrementalEvto, aCookieSlice, aMatchVlan, aSetVlan, aSetPcp, lastFlowToReconcile, aMeter)
 	if pVlanFilterFsm != nil {
 		dh.lockVlanConfig.Lock()
 		//ensure the mutex is locked throughout the state transition to 'starting' to prevent unintended (ignored) events to be sent there
diff --git a/internal/pkg/onuadaptercore/omci_ani_config.go b/internal/pkg/onuadaptercore/omci_ani_config.go
index 94cf938..e50bd58 100644
--- a/internal/pkg/onuadaptercore/omci_ani_config.go
+++ b/internal/pkg/onuadaptercore/omci_ani_config.go
@@ -402,6 +402,7 @@
 			loGemPortAttribs.qosPolicy = gemEntry.queueSchedPolicy
 			loGemPortAttribs.weight = gemEntry.queueWeight
 			loGemPortAttribs.pbitString = gemEntry.pbitString
+
 			if gemEntry.isMulticast {
 				//TODO this might effectively ignore the for loop starting at line 316
 				loGemPortAttribs.gemPortID = gemEntry.multicastGemPortID
diff --git a/internal/pkg/onuadaptercore/omci_cc.go b/internal/pkg/onuadaptercore/omci_cc.go
index 713426c..df13717 100644
--- a/internal/pkg/onuadaptercore/omci_cc.go
+++ b/internal/pkg/onuadaptercore/omci_cc.go
@@ -1489,6 +1489,48 @@
 	return nil, omciErr.GetError()
 }
 
+func (oo *omciCC) sendSetGemNCTPVar(ctx context.Context, timeout int, highPrio bool, rxChan chan Message, params ...me.ParamData) (*me.ManagedEntity, error) {
+	tid := oo.getNextTid(highPrio)
+	logger.Debugw(ctx, "send GemNCTP-Set-msg:", log.Fields{"device-id": oo.deviceID,
+		"SequNo": strconv.FormatInt(int64(tid), 16),
+		"InstId": strconv.FormatInt(int64(params[0].EntityID), 16)})
+
+	meInstance, omciErr := me.NewGemPortNetworkCtp(params[0])
+	if omciErr.GetError() == nil {
+		//obviously we have to set all 'untouched' parameters to default by some additional option parameter!!
+		omciLayer, msgLayer, err := omci.EncodeFrame(meInstance, omci.SetRequestType,
+			omci.TransactionID(tid), omci.AddDefaults(true))
+		if err != nil {
+			logger.Errorw(ctx, "Cannot encode GemNCTP for set", log.Fields{
+				"Err": err, "device-id": oo.deviceID})
+			return nil, err
+		}
+
+		pkt, err := serializeOmciLayer(ctx, omciLayer, msgLayer)
+		if err != nil {
+			logger.Errorw(ctx, "Cannot serialize GemNCTP set", log.Fields{
+				"Err": err, "device-id": oo.deviceID})
+			return nil, err
+		}
+
+		omciRxCallbackPair := callbackPair{
+			cbKey:   tid,
+			cbEntry: callbackPairEntry{rxChan, oo.receiveOmciResponse, true},
+		}
+		err = oo.send(ctx, pkt, timeout, cDefaultRetries, highPrio, omciRxCallbackPair)
+		if err != nil {
+			logger.Errorw(ctx, "Cannot send GemNCTP set", log.Fields{
+				"Err": err, "device-id": oo.deviceID})
+			return nil, err
+		}
+		logger.Debug(ctx, "send GemNCTP-Set-msg done", log.Fields{"device-id": oo.deviceID})
+		return meInstance, nil
+	}
+	logger.Errorw(ctx, "Cannot generate GemNCTP Instance", log.Fields{
+		"Err": omciErr.GetError(), "device-id": oo.deviceID})
+	return nil, omciErr.GetError()
+}
+
 func (oo *omciCC) sendCreateGemIWTPVar(ctx context.Context, timeout int, highPrio bool,
 	rxChan chan Message, params ...me.ParamData) (*me.ManagedEntity, error) {
 	tid := oo.getNextTid(highPrio)
@@ -1916,9 +1958,7 @@
 	return nil, omciErr.GetError()
 }
 
-// nolint: unused
-func (oo *omciCC) sendCreateTDVar(ctx context.Context, timeout int, highPrio bool,
-	rxChan chan Message, params ...me.ParamData) (*me.ManagedEntity, error) {
+func (oo *omciCC) sendCreateTDVar(ctx context.Context, timeout int, highPrio bool, rxChan chan Message, params ...me.ParamData) (*me.ManagedEntity, error) {
 	tid := oo.getNextTid(highPrio)
 	logger.Debugw(ctx, "send TD-Create-msg:", log.Fields{"device-id": oo.deviceID,
 		"SequNo": strconv.FormatInt(int64(tid), 16),
diff --git a/internal/pkg/onuadaptercore/omci_vlan_config.go b/internal/pkg/onuadaptercore/omci_vlan_config.go
index d95a8a7..4f8247a 100644
--- a/internal/pkg/onuadaptercore/omci_vlan_config.go
+++ b/internal/pkg/onuadaptercore/omci_vlan_config.go
@@ -27,6 +27,9 @@
 	"sync"
 	"time"
 
+	meters "github.com/opencord/voltha-lib-go/v5/pkg/meters"
+	"github.com/opencord/voltha-protos/v4/go/voltha"
+
 	gp "github.com/google/gopacket"
 	"github.com/looplab/fsm"
 	"github.com/opencord/omci-lib-go"
@@ -137,8 +140,9 @@
 }
 
 type uniVlanFlowParams struct {
-	CookieSlice    []uint64          `json:"cookie_slice"`
-	VlanRuleParams uniVlanRuleParams `json:"vlan_rule_params"`
+	CookieSlice    []uint64               `json:"cookie_slice"`
+	VlanRuleParams uniVlanRuleParams      `json:"vlan_rule_params"`
+	Meter          *voltha.OfpMeterConfig `json:"flow_meter"`
 }
 
 type uniRemoveVlanFlowParams struct {
@@ -168,6 +172,7 @@
 	mutexFlowParams             sync.RWMutex
 	chCookieDeleted             chan bool //channel to indicate that a specific cookie (related to the active rule) was deleted
 	actualUniVlanConfigRule     uniVlanRuleParams
+	actualUniVlanConfigMeter    *voltha.OfpMeterConfig
 	uniVlanFlowParamsSlice      []uniVlanFlowParams
 	uniRemoveFlowsSlice         []uniRemoveVlanFlowParams
 	numUniFlows                 uint8 // expected number of flows should be less than 12
@@ -194,7 +199,7 @@
 func NewUniVlanConfigFsm(ctx context.Context, apDeviceHandler *deviceHandler, apDevOmciCC *omciCC, apUniPort *onuUniPort,
 	apUniTechProf *onuUniTechProf, apOnuDB *onuDeviceDB, aTechProfileID uint8,
 	aRequestEvent OnuDeviceEvent, aName string, aCommChannel chan Message, aAcceptIncrementalEvto bool,
-	aCookieSlice []uint64, aMatchVlan uint16, aSetVlan uint16, aSetPcp uint8, lastFlowToRec bool) *UniVlanConfigFsm {
+	aCookieSlice []uint64, aMatchVlan uint16, aSetVlan uint16, aSetPcp uint8, lastFlowToRec bool, aMeter *voltha.OfpMeterConfig) *UniVlanConfigFsm {
 	instFsm := &UniVlanConfigFsm{
 		pDeviceHandler:              apDeviceHandler,
 		deviceID:                    apDeviceHandler.deviceID,
@@ -274,7 +279,7 @@
 		return nil
 	}
 
-	_ = instFsm.initUniFlowParams(ctx, aTechProfileID, aCookieSlice, aMatchVlan, aSetVlan, aSetPcp)
+	_ = instFsm.initUniFlowParams(ctx, aTechProfileID, aCookieSlice, aMatchVlan, aSetVlan, aSetPcp, aMeter)
 
 	logger.Debugw(ctx, "UniVlanConfigFsm created", log.Fields{"device-id": instFsm.deviceID,
 		"accIncrEvto": instFsm.acceptIncrementalEvtoOption})
@@ -283,7 +288,7 @@
 
 //initUniFlowParams is a simplified form of SetUniFlowParams() used for first flow parameters configuration
 func (oFsm *UniVlanConfigFsm) initUniFlowParams(ctx context.Context, aTpID uint8, aCookieSlice []uint64,
-	aMatchVlan uint16, aSetVlan uint16, aSetPcp uint8) error {
+	aMatchVlan uint16, aSetVlan uint16, aSetPcp uint8, aMeter *voltha.OfpMeterConfig) error {
 	loRuleParams := uniVlanRuleParams{
 		TpID:     aTpID,
 		MatchVid: uint32(aMatchVlan),
@@ -321,6 +326,9 @@
 	loFlowParams := uniVlanFlowParams{VlanRuleParams: loRuleParams}
 	loFlowParams.CookieSlice = make([]uint64, 0)
 	loFlowParams.CookieSlice = append(loFlowParams.CookieSlice, aCookieSlice...)
+	if aMeter != nil {
+		loFlowParams.Meter = aMeter
+	}
 
 	//no mutex protection is required for initial access and adding the first flow is always possible
 	oFsm.uniVlanFlowParamsSlice = make([]uniVlanFlowParams, 0)
@@ -330,7 +338,7 @@
 		"MatchVid":  strconv.FormatInt(int64(loRuleParams.MatchVid), 16),
 		"SetVid":    strconv.FormatInt(int64(loRuleParams.SetVid), 16),
 		"SetPcp":    loRuleParams.SetPcp,
-		"device-id": oFsm.deviceID})
+		"device-id": oFsm.deviceID, "uni-id": oFsm.pOnuUniPort.uniID})
 	oFsm.numUniFlows = 1
 	oFsm.uniRemoveFlowsSlice = make([]uniRemoveVlanFlowParams, 0) //initially nothing to remove
 
@@ -391,7 +399,7 @@
 // ignore complexity by now
 // nolint: gocyclo
 func (oFsm *UniVlanConfigFsm) SetUniFlowParams(ctx context.Context, aTpID uint8, aCookieSlice []uint64,
-	aMatchVlan uint16, aSetVlan uint16, aSetPcp uint8, lastFlowToReconcile bool) error {
+	aMatchVlan uint16, aSetVlan uint16, aSetPcp uint8, lastFlowToReconcile bool, aMeter *voltha.OfpMeterConfig) error {
 	loRuleParams := uniVlanRuleParams{
 		TpID:     aTpID,
 		MatchVid: uint32(aMatchVlan),
@@ -401,7 +409,6 @@
 	// some automatic adjustments on the filter/treat parameters as not specifically configured/ensured by flow configuration parameters
 	loRuleParams.TagsToRemove = 1            //one tag to remove as default setting
 	loRuleParams.MatchPcp = cPrioDoNotFilter // do not Filter on prio as default
-
 	if loRuleParams.SetVid == uint32(of.OfpVlanId_OFPVID_PRESENT) {
 		//then matchVlan is don't care and should be overwritten to 'transparent' here to avoid unneeded multiple flow entries
 		loRuleParams.MatchVid = uint32(of.OfpVlanId_OFPVID_PRESENT)
@@ -468,7 +475,10 @@
 		if storedUniFlowParams.VlanRuleParams == loRuleParams {
 			flowEntryMatch = true
 			logger.Debugw(ctx, "UniVlanConfigFsm flow setting - rule already exists", log.Fields{
-				"device-id": oFsm.deviceID})
+				"MatchVid":  strconv.FormatInt(int64(loRuleParams.MatchVid), 16),
+				"SetVid":    strconv.FormatInt(int64(loRuleParams.SetVid), 16),
+				"SetPcp":    loRuleParams.SetPcp,
+				"device-id": oFsm.deviceID, " uni-id": oFsm.pOnuUniPort.uniID})
 			var cookieMatch bool
 			for _, newCookie := range aCookieSlice { // for all cookies available in the arguments
 				cookieMatch = false
@@ -519,13 +529,16 @@
 			loFlowParams := uniVlanFlowParams{VlanRuleParams: loRuleParams}
 			loFlowParams.CookieSlice = make([]uint64, 0)
 			loFlowParams.CookieSlice = append(loFlowParams.CookieSlice, aCookieSlice...)
+			if aMeter != nil {
+				loFlowParams.Meter = aMeter
+			}
 			oFsm.uniVlanFlowParamsSlice = append(oFsm.uniVlanFlowParamsSlice, loFlowParams)
 			logger.Debugw(ctx, "UniVlanConfigFsm flow add", log.Fields{
 				"Cookies":  oFsm.uniVlanFlowParamsSlice[oFsm.numUniFlows].CookieSlice,
 				"MatchVid": strconv.FormatInt(int64(loRuleParams.MatchVid), 16),
 				"SetVid":   strconv.FormatInt(int64(loRuleParams.SetVid), 16),
 				"SetPcp":   loRuleParams.SetPcp, "numberofFlows": oFsm.numUniFlows + 1,
-				"device-id": oFsm.deviceID})
+				"device-id": oFsm.deviceID, "uni-id": oFsm.pOnuUniPort.uniID})
 
 			oFsm.numUniFlows++
 			pConfigVlanStateBaseFsm := oFsm.pAdaptFsm.pFsm
@@ -573,6 +586,7 @@
 						return fmt.Errorf("abort UniVlanConfigFsm on add due to internal counter mismatch %s", oFsm.deviceID)
 					}
 					oFsm.actualUniVlanConfigRule = oFsm.uniVlanFlowParamsSlice[oFsm.configuredUniFlow].VlanRuleParams
+					oFsm.actualUniVlanConfigMeter = oFsm.uniVlanFlowParamsSlice[oFsm.configuredUniFlow].Meter
 					//tpId of the next rule to be configured
 					tpID := oFsm.actualUniVlanConfigRule.TpID
 					loTechProfDone := oFsm.pUniTechProf.getTechProfileDone(ctx, oFsm.pOnuUniPort.uniID, tpID)
@@ -1065,6 +1079,7 @@
 		//access to uniVlanFlowParamsSlice is done on first element only here per definition
 		//store the actual rule that shall be worked upon in the following transient states
 		oFsm.actualUniVlanConfigRule = oFsm.uniVlanFlowParamsSlice[0].VlanRuleParams
+		oFsm.actualUniVlanConfigMeter = oFsm.uniVlanFlowParamsSlice[0].Meter
 		tpID := oFsm.actualUniVlanConfigRule.TpID
 		oFsm.TpIDWaitingFor = tpID
 		loTechProfDone := oFsm.pUniTechProf.getTechProfileDone(ctx, oFsm.pOnuUniPort.uniID, uint8(tpID))
@@ -1184,6 +1199,21 @@
 				oFsm.mutexFlowParams.RLock()
 			}
 			oFsm.mutexFlowParams.RUnlock()
+			//If this first flow contains a meter, then create TD for related gems.
+			if oFsm.actualUniVlanConfigMeter != nil {
+				logger.Debugw(ctx, "Creating Traffic Descriptor", log.Fields{"device-id": oFsm.deviceID, "meter": oFsm.actualUniVlanConfigMeter})
+				for _, gemPort := range oFsm.pUniTechProf.getBidirectionalGemPortIDsForTP(ctx, oFsm.pOnuUniPort.uniID, tpID) {
+					logger.Debugw(ctx, "Creating Traffic Descriptor for gem", log.Fields{"device-id": oFsm.deviceID, "meter": oFsm.actualUniVlanConfigMeter, "gem": gemPort})
+					errCreateTrafficDescriptor := oFsm.createTrafficDescriptor(ctx, oFsm.actualUniVlanConfigMeter, tpID,
+						oFsm.pOnuUniPort.uniID, gemPort)
+					if errCreateTrafficDescriptor != nil {
+						logger.Errorw(ctx, "Create Traffic Descriptor create failed, aborting Ani Config FSM!",
+							log.Fields{"device-id": oFsm.deviceID})
+						_ = oFsm.pAdaptFsm.pFsm.Event(vlanEvReset)
+					}
+				}
+			}
+
 			//TODO Possibly insert new state for multicast --> possibly another jira/later time.
 			_ = oFsm.pAdaptFsm.pFsm.Event(vlanEvRxConfigEvtocd)
 		}
@@ -1257,6 +1287,7 @@
 			return
 		}
 		oFsm.actualUniVlanConfigRule = oFsm.uniVlanFlowParamsSlice[oFsm.configuredUniFlow].VlanRuleParams
+		oFsm.actualUniVlanConfigMeter = oFsm.uniVlanFlowParamsSlice[oFsm.configuredUniFlow].Meter
 		//tpId of the next rule to be configured
 		tpID := oFsm.actualUniVlanConfigRule.TpID
 		oFsm.TpIDWaitingFor = tpID
@@ -1447,6 +1478,19 @@
 					_ = oFsm.pAdaptFsm.pFsm.Event(vlanEvReset)
 				}
 			}
+			//If this incremental flow contains a meter, then create TD for related gems.
+			if oFsm.actualUniVlanConfigMeter != nil {
+				for _, gemPort := range oFsm.pUniTechProf.getBidirectionalGemPortIDsForTP(ctx, oFsm.pOnuUniPort.uniID, tpID) {
+					logger.Debugw(ctx, "Creating Traffic Descriptor for gem", log.Fields{"device-id": oFsm.deviceID, "meter": oFsm.actualUniVlanConfigMeter, "gem": gemPort})
+					errCreateTrafficDescriptor := oFsm.createTrafficDescriptor(ctx, oFsm.actualUniVlanConfigMeter, tpID,
+						oFsm.pOnuUniPort.uniID, gemPort)
+					if errCreateTrafficDescriptor != nil {
+						logger.Errorw(ctx, "Create Traffic Descriptor create failed, aborting Ani Config FSM!",
+							log.Fields{"device-id": oFsm.deviceID})
+						_ = oFsm.pAdaptFsm.pFsm.Event(vlanEvReset)
+					}
+				}
+			}
 			_ = oFsm.pAdaptFsm.pFsm.Event(vlanEvRxConfigEvtocd)
 		}
 	}()
@@ -1831,7 +1875,7 @@
 				if msgObj.EntityClass == oFsm.pLastTxMeInstance.GetClassID() &&
 					msgObj.EntityInstance == oFsm.pLastTxMeInstance.GetEntityID() {
 					switch oFsm.pLastTxMeInstance.GetName() {
-					case "VlanTaggingFilterData", "ExtendedVlanTaggingOperationConfigurationData", "MulticastOperationsProfile":
+					case "VlanTaggingFilterData", "ExtendedVlanTaggingOperationConfigurationData", "MulticastOperationsProfile", "GemPortNetworkCtp":
 						{ // let the MultiEntity config proceed by stopping the wait function
 							oFsm.mutexPLastTxMeInstance.RUnlock()
 							oFsm.omciMIdsResponseReceived <- true
@@ -1896,7 +1940,7 @@
 			switch oFsm.pLastTxMeInstance.GetName() {
 			case "VlanTaggingFilterData", "MulticastOperationsProfile",
 				"MulticastSubscriberConfigInfo", "MacBridgePortConfigurationData",
-				"ExtendedVlanTaggingOperationConfigurationData":
+				"ExtendedVlanTaggingOperationConfigurationData", "TrafficDescriptor":
 				{
 					oFsm.mutexPLastTxMeInstance.RUnlock()
 					if oFsm.pAdaptFsm.pFsm.Current() == vlanStConfigVtfd {
@@ -1949,7 +1993,7 @@
 		if msgObj.EntityClass == oFsm.pLastTxMeInstance.GetClassID() &&
 			msgObj.EntityInstance == oFsm.pLastTxMeInstance.GetEntityID() {
 			switch oFsm.pLastTxMeInstance.GetName() {
-			case "VlanTaggingFilterData", "ExtendedVlanTaggingOperationConfigurationData":
+			case "VlanTaggingFilterData", "ExtendedVlanTaggingOperationConfigurationData", "TrafficDescriptor":
 				{ // let the MultiEntity config proceed by stopping the wait function
 					oFsm.mutexPLastTxMeInstance.RUnlock()
 					oFsm.omciMIdsResponseReceived <- true
@@ -2919,6 +2963,86 @@
 	return nil
 }
 
+func (oFsm *UniVlanConfigFsm) createTrafficDescriptor(ctx context.Context, aMeter *voltha.OfpMeterConfig,
+	tpID uint8, uniID uint8, gemPortID uint16) error {
+	logger.Infow(ctx, "Starting create traffic descriptor", log.Fields{"device-id": oFsm.deviceID, "uniID": uniID, "tpID": tpID})
+	// uniTPKey  generate id to Traffic Descriptor ME. We need to create two of them. They should be unique. Because of that
+	// I created unique TD ID by flow direction.
+	// TODO! Traffic descriptor ME ID will check
+	trafficDescriptorID := gemPortID
+	if aMeter == nil {
+		return fmt.Errorf("meter not found %s", oFsm.deviceID)
+	}
+	trafficShapingInfo, err := meters.GetTrafficShapingInfo(ctx, aMeter)
+	if err != nil {
+		logger.Errorw(ctx, "Traffic Shaping Info get failed", log.Fields{"device-id": oFsm.deviceID})
+		return err
+	}
+	cir := trafficShapingInfo.Cir + trafficShapingInfo.Gir
+	cbs := trafficShapingInfo.Cbs
+	pir := trafficShapingInfo.Pir
+	pbs := trafficShapingInfo.Pbs
+
+	logger.Infow(ctx, "cir-pir-cbs-pbs", log.Fields{"device-id": oFsm.deviceID, "cir": cir, "pir": pir, "cbs": cbs, "pbs": pbs})
+	meParams := me.ParamData{
+		EntityID: trafficDescriptorID,
+		Attributes: me.AttributeValueMap{
+			"Cir":                  cir,
+			"Pir":                  pir,
+			"Cbs":                  cbs,
+			"Pbs":                  pbs,
+			"ColourMode":           1,
+			"IngressColourMarking": 3,
+			"EgressColourMarking":  3,
+			"MeterType":            1,
+		},
+	}
+	meInstance, errCreateTD := oFsm.pOmciCC.sendCreateTDVar(log.WithSpanFromContext(context.TODO(), ctx), oFsm.pDeviceHandler.pOpenOnuAc.omciTimeout,
+		true, oFsm.pAdaptFsm.commChan, meParams)
+	if errCreateTD != nil {
+		logger.Errorw(ctx, "Traffic Descriptor create failed", log.Fields{"device-id": oFsm.deviceID})
+		return err
+	}
+	oFsm.pLastTxMeInstance = meInstance
+	err = oFsm.waitforOmciResponse(ctx)
+	if err != nil {
+		logger.Errorw(ctx, "Traffic Descriptor create failed, aborting VlanConfig FSM!", log.Fields{"device-id": oFsm.deviceID})
+		return err
+	}
+
+	err = oFsm.setTrafficDescriptorToGemPortNWCTP(ctx, gemPortID)
+	if err != nil {
+		logger.Errorw(ctx, "Traffic Descriptor set failed to Gem Port Network CTP, aborting VlanConfig FSM!", log.Fields{"device-id": oFsm.deviceID})
+		return err
+	}
+	logger.Infow(ctx, "Set TD Info to GemPortNWCTP successfully", log.Fields{"device-id": oFsm.deviceID, "gem-port-id": gemPortID, "td-id": trafficDescriptorID})
+
+	return nil
+}
+
+func (oFsm *UniVlanConfigFsm) setTrafficDescriptorToGemPortNWCTP(ctx context.Context, gemPortID uint16) error {
+	logger.Debugw(ctx, "Starting Set Traffic Descriptor to GemPortNWCTP", log.Fields{"device-id": oFsm.deviceID, "gem-port-id": gemPortID})
+	meParams := me.ParamData{
+		EntityID: gemPortID,
+		Attributes: me.AttributeValueMap{
+			"TrafficManagementPointerForUpstream": gemPortID,
+		},
+	}
+	meInstance, err := oFsm.pOmciCC.sendSetGemNCTPVar(log.WithSpanFromContext(context.TODO(), ctx),
+		oFsm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, oFsm.pAdaptFsm.commChan, meParams)
+	if err != nil {
+		logger.Errorw(ctx, "GemNCTP set failed", log.Fields{"device-id": oFsm.deviceID})
+		return err
+	}
+	oFsm.pLastTxMeInstance = meInstance
+	err = oFsm.waitforOmciResponse(ctx)
+	if err != nil {
+		logger.Errorw(ctx, "Upstream Traffic Descriptor set failed, aborting VlanConfig FSM!", log.Fields{"device-id": oFsm.deviceID})
+		return err
+	}
+	return nil
+}
+
 // IsFlowRemovePending returns true if there are pending flows to remove, else false.
 func (oFsm *UniVlanConfigFsm) IsFlowRemovePending(aFlowDeleteChannel chan<- bool) bool {
 	oFsm.mutexFlowParams.Lock()
diff --git a/internal/pkg/onuadaptercore/onu_uni_tp.go b/internal/pkg/onuadaptercore/onu_uni_tp.go
index 246bd94..0f2b8e6 100644
--- a/internal/pkg/onuadaptercore/onu_uni_tp.go
+++ b/internal/pkg/onuadaptercore/onu_uni_tp.go
@@ -936,6 +936,28 @@
 	return gemPortIds
 }
 
+func (onuTP *onuUniTechProf) getBidirectionalGemPortIDsForTP(ctx context.Context, aUniID uint8, aTpID uint8) []uint16 {
+	uniTpKey := uniTP{uniID: aUniID, tpID: aTpID}
+	onuTP.mutexTPState.RLock()
+	defer onuTP.mutexTPState.RUnlock()
+	gemPortIds := make([]uint16, 0)
+	if techProfile, existTP := onuTP.mapPonAniConfig[uniTpKey]; existTP {
+		logger.Debugw(ctx, "TechProfile exist", log.Fields{"device-id": onuTP.deviceID})
+		for _, gemPortParam := range techProfile.mapGemPortParams {
+			if !gemPortParam.isMulticast {
+				logger.Debugw(ctx, "Detected unicast gemPort", log.Fields{"device-id": onuTP.deviceID,
+					"aUniID": aUniID, "aTPID": aTpID, "uniTPKey": uniTpKey,
+					"GemId": gemPortParam.multicastGemPortID})
+				gemPortIds = append(gemPortIds, gemPortParam.gemPortID)
+			}
+		}
+	} else {
+		logger.Debugw(ctx, "TechProfile doesn't exist", log.Fields{"device-id": onuTP.deviceID})
+	} //else: the state is just ignored (does not exist)
+	logger.Debugw(ctx, "Gem PortID list", log.Fields{"device-id": onuTP.deviceID, "gemportList": gemPortIds})
+	return gemPortIds
+}
+
 func (onuTP *onuUniTechProf) GetAllBidirectionalGemPortIDsForOnu() []uint16 {
 	var gemPortInstIDs []uint16
 	onuTP.mutexTPState.RLock()
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/meters/common.go b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/meters/common.go
new file mode 100644
index 0000000..e058e48
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/meters/common.go
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package meters
+
+import (
+	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+)
+
+var logger log.CLogger
+
+func init() {
+	// Setup this package so that it's log level can be modified at run time
+	var err error
+	logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
+	if err != nil {
+		panic(err)
+	}
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/meters/meter_utils.go b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/meters/meter_utils.go
new file mode 100644
index 0000000..d220c0b
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/meters/meter_utils.go
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package meters
+
+import (
+	"context"
+	"fmt"
+	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+	ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
+	tp_pb "github.com/opencord/voltha-protos/v4/go/tech_profile"
+)
+
+// GetTrafficShapingInfo returns CIR,PIR and GIR values
+func GetTrafficShapingInfo(ctx context.Context, meterConfig *ofp.OfpMeterConfig) (*tp_pb.TrafficShapingInfo, error) {
+	switch meterBandSize := len(meterConfig.Bands); {
+	case meterBandSize == 1:
+		band := meterConfig.Bands[0]
+		if band.BurstSize == 0 { // GIR, tcont type 1
+			return &tp_pb.TrafficShapingInfo{Gir: band.Rate}, nil
+		}
+		return &tp_pb.TrafficShapingInfo{Pir: band.Rate, Pbs: band.BurstSize}, nil // PIR, tcont type 4
+	case meterBandSize == 2:
+		firstBand, secondBand := meterConfig.Bands[0], meterConfig.Bands[1]
+		if firstBand.BurstSize == 0 && secondBand.BurstSize == 0 &&
+			firstBand.Rate == secondBand.Rate { // PIR == GIR, tcont type 1
+			return &tp_pb.TrafficShapingInfo{Pir: firstBand.Rate, Gir: secondBand.Rate}, nil
+		}
+		if firstBand.BurstSize > 0 && secondBand.BurstSize > 0 { // PIR, CIR, tcont type 2 or 3
+			if firstBand.Rate > secondBand.Rate { // always PIR >= CIR
+				return &tp_pb.TrafficShapingInfo{Pir: firstBand.Rate, Pbs: firstBand.BurstSize, Cir: secondBand.Rate, Cbs: secondBand.BurstSize}, nil
+			}
+			return &tp_pb.TrafficShapingInfo{Pir: secondBand.Rate, Pbs: secondBand.BurstSize, Cir: firstBand.Rate, Cbs: firstBand.BurstSize}, nil
+		}
+	case meterBandSize == 3: // PIR,CIR,GIR, tcont type 5
+		var count, girIndex int
+		for i, band := range meterConfig.Bands {
+			if band.BurstSize == 0 { // find GIR
+				count = count + 1
+				girIndex = i
+			}
+		}
+		if count == 1 {
+			bands := make([]*ofp.OfpMeterBandHeader, len(meterConfig.Bands))
+			copy(bands, meterConfig.Bands)
+			pirCirBands := append(bands[:girIndex], bands[girIndex+1:]...)
+			firstBand, secondBand := pirCirBands[0], pirCirBands[1]
+			if firstBand.Rate > secondBand.Rate {
+				return &tp_pb.TrafficShapingInfo{Pir: firstBand.Rate, Pbs: firstBand.BurstSize, Cir: secondBand.Rate, Cbs: secondBand.BurstSize, Gir: meterConfig.Bands[girIndex].Rate}, nil
+			}
+			return &tp_pb.TrafficShapingInfo{Pir: secondBand.Rate, Pbs: secondBand.BurstSize, Cir: firstBand.Rate, Cbs: firstBand.BurstSize, Gir: meterConfig.Bands[girIndex].Rate}, nil
+		}
+	default:
+		logger.Errorw(ctx, "invalid-meter-config", log.Fields{"meter-config": meterConfig})
+		return nil, fmt.Errorf("invalid-meter-config: %v", meterConfig)
+	}
+	return nil, fmt.Errorf("invalid-meter-config: %v", meterConfig)
+}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 7ff3d91..eedfe13 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -78,6 +78,7 @@
 github.com/opencord/voltha-lib-go/v5/pkg/flows
 github.com/opencord/voltha-lib-go/v5/pkg/kafka
 github.com/opencord/voltha-lib-go/v5/pkg/log
+github.com/opencord/voltha-lib-go/v5/pkg/meters
 github.com/opencord/voltha-lib-go/v5/pkg/probe
 github.com/opencord/voltha-lib-go/v5/pkg/version
 # github.com/opencord/voltha-protos/v4 v4.2.0