VOL-1623-meter support and handling  techprofile and fix for flow delete , now migrated to onosproject/onos:1.13.9-rc4

Change in flowupdate API towards adapters

Remove meter_get API from adapter to core

Added dependent vendor library files downloaded  by "dep-ensure -update"

Added techprofile changes in the single commit

Review comments are addressed

submiting patch for  integration tests for meter changes and modifications in unit test for updated flow decomposer logic
  - submitting on behalf of "Salman.Siddiqui@radisys.com"

Load test for meter updated and other flow management test cases with meter
- Performed load test for 1K meters serially and parallely and added more TC in flow management

Rebased

Load test for meter updated and other flow management test cases with meter
- Performed load test for 1K meters serially and parallely and added more TC in flow management
- submitting on behalf of "Salman.Siddiqui@radisys.com"

pulled latest protos

verified EAPOL/DHCP/HSIA data with Edgecore OLT & TW ONT kit for one subcriber
verified delete/re-add is working end to end for the same subscriber

Change-Id: Idb232b7a0f05dc0c7e68266ac885740a3adff317
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 70349d8..49e1463 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -45,6 +45,7 @@
 	deviceGraph        *graph.DeviceGraph
 	flowProxy          *model.Proxy
 	groupProxy         *model.Proxy
+	meterProxy         *model.Proxy
 	ldProxy            *model.Proxy
 	portProxies        map[string]*model.Proxy
 	portProxiesLock    sync.RWMutex
@@ -139,6 +140,10 @@
 		ctx,
 		fmt.Sprintf("/logical_devices/%s/flows", agent.logicalDeviceId),
 		false)
+	agent.meterProxy = agent.clusterDataProxy.CreateProxy(
+		ctx,
+		fmt.Sprintf("/logical_devices/%s/meters", agent.logicalDeviceId),
+		false)
 	agent.groupProxy = agent.clusterDataProxy.CreateProxy(
 		ctx,
 		fmt.Sprintf("/logical_devices/%s/flow_groups", agent.logicalDeviceId),
@@ -199,6 +204,18 @@
 	return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
 }
 
+func (agent *LogicalDeviceAgent) ListLogicalDeviceMeters() (*ofp.Meters, error) {
+	log.Debug("ListLogicalDeviceMeters")
+	agent.lockLogicalDevice.RLock()
+	defer agent.lockLogicalDevice.RUnlock()
+	logicalDevice := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceId, 0, false, "")
+	if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
+		cMeters := (proto.Clone(lDevice.Meters)).(*ofp.Meters)
+		return cMeters, nil
+	}
+	return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
+}
+
 func (agent *LogicalDeviceAgent) ListLogicalDeviceFlowGroups() (*ofp.FlowGroups, error) {
 	log.Debug("ListLogicalDeviceFlowGroups")
 	agent.lockLogicalDevice.RLock()
@@ -261,6 +278,16 @@
 }
 
 //updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
+func (agent *LogicalDeviceAgent) updateLogicalDeviceMetersWithoutLock(meters *ofp.Meters) error {
+	updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+	afterUpdate := agent.meterProxy.Update(updateCtx, "/", meters, false, "")
+	if afterUpdate == nil {
+		return status.Errorf(codes.Internal, "failed-updating-logical-device-meters:%s", agent.logicalDeviceId)
+	}
+	return nil
+}
+
+//updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
 func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowGroupsWithoutLock(flowGroups *ofp.FlowGroups) error {
 	updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
 	afterUpdate := agent.groupProxy.Update(updateCtx, "/", flowGroups, false, "")
@@ -520,6 +547,222 @@
 		"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceId, groupMod.GetCommand())
 }
 
+// updateMeterTable updates the meter table of that logical device
+func (agent *LogicalDeviceAgent) updateMeterTable(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
+	log.Debug("updateMeterTable")
+	if meterMod == nil {
+		return nil
+	}
+	switch meterMod.GetCommand() {
+	case ofp.OfpMeterModCommand_OFPMC_ADD:
+		return agent.meterAdd(meterMod)
+	case ofp.OfpMeterModCommand_OFPMC_DELETE:
+		return agent.meterDelete(meterMod)
+	case ofp.OfpMeterModCommand_OFPMC_MODIFY:
+		return agent.meterModify(meterMod)
+	}
+	return status.Errorf(codes.Internal,
+		"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceId, meterMod.GetCommand())
+
+}
+
+func (agent *LogicalDeviceAgent) meterAdd(meterMod *ofp.OfpMeterMod) error {
+	log.Debugw("meterAdd", log.Fields{"metermod": *meterMod})
+	if meterMod == nil {
+		return nil
+	}
+	log.Debug("Waiting for logical device lock!!")
+	agent.lockLogicalDevice.Lock()
+	defer agent.lockLogicalDevice.Unlock()
+	log.Debug("Acquired logical device lock")
+	var lDevice *voltha.LogicalDevice
+	var err error
+	if lDevice, err = agent.getLogicalDeviceWithoutLock(); err != nil {
+		log.Errorw("no-logical-device-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+		return errors.New(fmt.Sprintf("no-logical-device-present:%s", agent.logicalDeviceId))
+	}
+
+	var meters []*ofp.OfpMeterEntry
+	if lDevice.Meters != nil && lDevice.Meters.Items != nil {
+		meters = lDevice.Meters.Items
+	}
+	log.Debugw("Available meters", log.Fields{"meters": meters})
+
+	for _, meter := range meters {
+		if meterMod.MeterId == meter.Config.MeterId {
+			log.Infow("Meter-already-exists", log.Fields{"meter": *meterMod})
+			return nil
+		}
+	}
+
+	meterEntry := fu.MeterEntryFromMeterMod(meterMod)
+	meters = append(meters, meterEntry)
+	//Update model
+	if err := agent.updateLogicalDeviceMetersWithoutLock(&ofp.Meters{Items: meters}); err != nil {
+		log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+		return err
+	}
+	log.Debugw("Meter-added-successfully", log.Fields{"Added-meter": meterEntry, "updated-meters": lDevice.Meters})
+	return nil
+}
+
+func (agent *LogicalDeviceAgent) meterDelete(meterMod *ofp.OfpMeterMod) error {
+	log.Debug("meterDelete", log.Fields{"meterMod": *meterMod})
+	if meterMod == nil {
+		return nil
+	}
+	agent.lockLogicalDevice.Lock()
+	defer agent.lockLogicalDevice.Unlock()
+
+	var lDevice *voltha.LogicalDevice
+	var err error
+	if lDevice, err = agent.getLogicalDeviceWithoutLock(); err != nil {
+		log.Errorw("no-logical-device-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+		return errors.New(fmt.Sprintf("no-logical-device-present:%s", agent.logicalDeviceId))
+	}
+
+	var meters []*ofp.OfpMeterEntry
+	var flows []*ofp.OfpFlowStats
+	updatedFlows := make([]*ofp.OfpFlowStats, 0)
+	if lDevice.Meters != nil && lDevice.Meters.Items != nil {
+		meters = lDevice.Meters.Items
+	}
+	if lDevice.Flows != nil && lDevice.Flows.Items != nil {
+		flows = lDevice.Flows.Items
+	}
+
+	changedMeter := false
+	changedFow := false
+	log.Debugw("Available meters", log.Fields{"meters": meters})
+	for index, meter := range meters {
+		if meterMod.MeterId == meter.Config.MeterId {
+			flows = lDevice.Flows.Items
+			changedFow, updatedFlows = agent.getUpdatedFlowsAfterDeletebyMeterId(flows, meterMod.MeterId)
+			meters = append(meters[:index], meters[index+1:]...)
+			log.Debugw("Meter has been deleted", log.Fields{"meter": meter, "index": index})
+			changedMeter = true
+			break
+		}
+	}
+	if changedMeter {
+		//Update model
+		metersToUpdate := &ofp.Meters{}
+		if lDevice.Meters != nil {
+			metersToUpdate = &ofp.Meters{Items: meters}
+		}
+		if err := agent.updateLogicalDeviceMetersWithoutLock(metersToUpdate); err != nil {
+			log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+			return err
+		}
+		log.Debug("Meter-deleted-from-DB-successfully", log.Fields{"updatedMeters": metersToUpdate, "no-of-meter": len(metersToUpdate.Items)})
+
+	}
+	if changedFow {
+		//Update model
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: updatedFlows}); err != nil {
+			log.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+			return err
+		}
+		log.Debug("Flows-associated-with-meter-deleted-from-DB-successfully",
+			log.Fields{"updated-no-of-flows": len(updatedFlows), "meter": meterMod.MeterId})
+	}
+	log.Debugw("meterDelete success", log.Fields{"meterID": meterMod.MeterId})
+	return nil
+}
+
+func (agent *LogicalDeviceAgent) meterModify(meterMod *ofp.OfpMeterMod) error {
+	log.Debug("meterModify")
+	if meterMod == nil {
+		return nil
+	}
+	agent.lockLogicalDevice.Lock()
+	defer agent.lockLogicalDevice.Unlock()
+
+	var lDevice *voltha.LogicalDevice
+	var err error
+	if lDevice, err = agent.getLogicalDeviceWithoutLock(); err != nil {
+		log.Errorw("no-logical-device-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+		return errors.New(fmt.Sprintf("no-logical-device-present:%s", agent.logicalDeviceId))
+	}
+
+	var meters []*ofp.OfpMeterEntry
+	if lDevice.Meters != nil && lDevice.Meters.Items != nil {
+		meters = lDevice.Meters.Items
+	}
+	changedMeter := false
+	for index, meter := range meters {
+		if meterMod.MeterId == meter.Config.MeterId {
+			newmeterEntry := fu.MeterEntryFromMeterMod(meterMod)
+			newmeterEntry.Stats.FlowCount = meter.Stats.FlowCount
+			meters[index] = newmeterEntry
+			changedMeter = true
+			log.Debugw("Found meter, replaced with new meter", log.Fields{"old meter": meter, "new meter": newmeterEntry})
+			break
+		}
+	}
+	if changedMeter {
+		//Update model
+		metersToUpdate := &ofp.Meters{}
+		if lDevice.Meters != nil {
+			metersToUpdate = &ofp.Meters{Items: meters}
+		}
+		if err := agent.updateLogicalDeviceMetersWithoutLock(metersToUpdate); err != nil {
+			log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+			return err
+		}
+		log.Debugw("meter-updated-in-DB-successfully", log.Fields{"updated_meters": meters})
+		return nil
+	}
+
+	log.Errorw("Meter not found ", log.Fields{"meter": meterMod})
+	return errors.New(fmt.Sprintf("no-logical-device-present:%d", meterMod.MeterId))
+
+}
+
+func (agent *LogicalDeviceAgent) getUpdatedFlowsAfterDeletebyMeterId(flows []*ofp.OfpFlowStats, meterId uint32) (bool, []*ofp.OfpFlowStats) {
+	log.Infow("Delete flows matching meter", log.Fields{"meter": meterId})
+	changed := false
+	//updatedFlows := make([]*ofp.OfpFlowStats, 0)
+	for index := len(flows) - 1; index >= 0; index-- {
+		if mId := fu.GetMeterIdFromFlow(flows[index]); mId != 0 && mId == meterId {
+			log.Debugw("Flow to be deleted", log.Fields{"flow": flows[index], "index": index})
+			flows = append(flows[:index], flows[index+1:]...)
+			changed = true
+		}
+	}
+	return changed, flows
+}
+
+func (agent *LogicalDeviceAgent) updateFlowCountOfMeterStats(modCommand *ofp.OfpFlowMod, meters []*ofp.OfpMeterEntry, flow *ofp.OfpFlowStats) bool {
+
+	flowCommand := modCommand.GetCommand()
+	meterId := fu.GetMeterIdFromFlow(flow)
+	log.Debugw("Meter-id-in-flow-mod", log.Fields{"meterId": meterId})
+	if meterId == 0 {
+		log.Debugw("No meter present in the flow", log.Fields{"flow": *flow})
+		return false
+	}
+	if meters == nil {
+		log.Debug("No meters present in logical device")
+		return false
+	}
+	changedMeter := false
+	for _, meter := range meters {
+		if meterId == meter.Config.MeterId { // Found meter in Logicaldevice
+			if flowCommand == ofp.OfpFlowModCommand_OFPFC_ADD {
+				meter.Stats.FlowCount += 1
+				changedMeter = true
+			} else if flowCommand == ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT {
+				meter.Stats.FlowCount -= 1
+				changedMeter = true
+			}
+			log.Debugw("Found meter, updated meter flow stats", log.Fields{" meterId": meterId})
+			break
+		}
+	}
+	return changedMeter
+}
+
 //flowAdd adds a flow to the flow table of that logical device
 func (agent *LogicalDeviceAgent) flowAdd(mod *ofp.OfpFlowMod) error {
 	log.Debug("flowAdd")
@@ -537,12 +780,19 @@
 	}
 
 	var flows []*ofp.OfpFlowStats
+	var meters []*ofp.OfpMeterEntry
+	var flow *ofp.OfpFlowStats
+
 	if lDevice.Flows != nil && lDevice.Flows.Items != nil {
 		flows = lDevice.Flows.Items
 	}
 
+	if lDevice.Meters != nil && lDevice.Meters.Items != nil {
+		meters = lDevice.Meters.Items
+	}
 	updatedFlows := make([]*ofp.OfpFlowStats, 0)
 	changed := false
+	updated := false
 	checkOverlap := (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_CHECK_OVERLAP)) != 0
 	if checkOverlap {
 		if overlapped := fu.FindOverlappingFlows(flows, mod); len(overlapped) != 0 {
@@ -550,13 +800,13 @@
 			log.Warnw("overlapped-flows", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
 		} else {
 			//	Add flow
-			flow := fu.FlowStatsEntryFromFlowModMessage(mod)
+			flow = fu.FlowStatsEntryFromFlowModMessage(mod)
 			flows = append(flows, flow)
 			updatedFlows = append(updatedFlows, flow)
 			changed = true
 		}
 	} else {
-		flow := fu.FlowStatsEntryFromFlowModMessage(mod)
+		flow = fu.FlowStatsEntryFromFlowModMessage(mod)
 		idx := fu.FindFlows(flows, flow)
 		if idx >= 0 {
 			oldFlow := flows[idx]
@@ -568,6 +818,7 @@
 				flows[idx] = flow
 				updatedFlows = append(updatedFlows, flow)
 				changed = true
+				updated = true
 			}
 		} else {
 			flows = append(flows, flow)
@@ -576,10 +827,15 @@
 		}
 	}
 	if changed {
+		var flowMetadata voltha.FlowMetadata
+		if err := agent.GetMeterConfig(updatedFlows, meters, &flowMetadata); err != nil { // This should never happen,meters should be installed before flow arrives
+			log.Error("Meter-referred-in-flows-not-present")
+			return err
+		}
 		deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: updatedFlows}, *lDevice.FlowGroups)
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.addDeviceFlowsAndGroups(deviceRules); err != nil {
+		if err := agent.addDeviceFlowsAndGroups(deviceRules, &flowMetadata); err != nil {
 			log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
 			return err
 		}
@@ -589,10 +845,55 @@
 			log.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
+		if !updated {
+			changedMeterStats := agent.updateFlowCountOfMeterStats(mod, meters, flow)
+			metersToUpdate := &ofp.Meters{}
+			if lDevice.Meters != nil {
+				metersToUpdate = &ofp.Meters{Items: meters}
+			}
+			if changedMeterStats {
+				//Update model
+				if err := agent.updateLogicalDeviceMetersWithoutLock(metersToUpdate); err != nil {
+					log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+					return err
+				}
+				log.Debugw("meter-stats-updated-in-DB-successfully", log.Fields{"updated_meters": meters})
+
+			}
+		}
+
 	}
 	return nil
 }
 
+func (agent *LogicalDeviceAgent) GetMeterConfig(flows []*ofp.OfpFlowStats, meters []*ofp.OfpMeterEntry, metadata *voltha.FlowMetadata) error {
+	m := make(map[uint32]bool)
+	for _, flow := range flows {
+		if flowMeterID := fu.GetMeterIdFromFlow(flow); flowMeterID != 0 && m[flowMeterID] == false {
+			foundMeter := false
+			// Meter is present in the flow , Get from logical device
+			for _, meter := range meters {
+				if flowMeterID == meter.Config.MeterId {
+					metadata.Meters = append(metadata.Meters, meter.Config)
+					log.Debugw("Found meter in logical device",
+						log.Fields{"meterID": flowMeterID, "meter-band": meter.Config})
+					m[flowMeterID] = true
+					foundMeter = true
+					break
+				}
+			}
+			if !foundMeter {
+				log.Errorw("Meter-referred-by-flow-is-not-found-in-logicaldevice",
+					log.Fields{"meterID": flowMeterID, "Avaliable-meters": meters, "flow": *flow})
+				return errors.New("Meter-referred-by-flow-is-not-found-in-logicaldevice")
+			}
+		}
+	}
+	log.Debugw("meter-bands-for-flows", log.Fields{"flows": len(flows), "metadata": metadata})
+	return nil
+
+}
+
 //flowDelete deletes a flow from the flow table of that logical device
 func (agent *LogicalDeviceAgent) flowDelete(mod *ofp.OfpFlowMod) error {
 	log.Debug("flowDelete")
@@ -608,8 +909,17 @@
 		log.Errorw("no-logical-device-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 		return errors.New(fmt.Sprintf("no-logical-device-present:%s", agent.logicalDeviceId))
 	}
-	flows := lDevice.Flows.Items
 
+	var meters []*ofp.OfpMeterEntry
+	var flows []*ofp.OfpFlowStats
+
+	if lDevice.Flows != nil && lDevice.Flows.Items != nil {
+		flows = lDevice.Flows.Items
+	}
+
+	if lDevice.Meters != nil && lDevice.Meters.Items != nil {
+		meters = lDevice.Meters.Items
+	}
 	//build a list of what to keep vs what to delete
 	toKeep := make([]*ofp.OfpFlowStats, 0)
 	toDelete := make([]*ofp.OfpFlowStats, 0)
@@ -631,10 +941,15 @@
 
 	//Update flows
 	if len(toDelete) > 0 {
+		var flowMetadata voltha.FlowMetadata
+		if err := agent.GetMeterConfig(toDelete, meters, &flowMetadata); err != nil { // This should never happen
+			log.Error("Meter-referred-in-flows-not-present")
+			return errors.New("Meter-referred-in-flows-not-present")
+		}
 		deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: toDelete}, ofp.FlowGroups{})
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.deleteDeviceFlowsAndGroups(deviceRules); err != nil {
+		if err := agent.deleteDeviceFlowsAndGroups(deviceRules, &flowMetadata); err != nil {
 			log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
 			return err
 		}
@@ -649,15 +964,15 @@
 	return nil
 }
 
-func (agent *LogicalDeviceAgent) addDeviceFlowsAndGroups(deviceRules *fu.DeviceRules) error {
-	log.Debugw("addDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
+func (agent *LogicalDeviceAgent) addDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
+	log.Debugw("addDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
 
 	chnlsList := make([]chan interface{}, 0)
 	for deviceId, value := range deviceRules.GetRules() {
 		ch := make(chan interface{})
 		chnlsList = append(chnlsList, ch)
 		go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
-			if err := agent.deviceMgr.addFlowsAndGroups(deviceId, flows, groups); err != nil {
+			if err := agent.deviceMgr.addFlowsAndGroups(deviceId, flows, groups, flowMetadata); err != nil {
 				log.Errorw("flow-add-failed", log.Fields{"deviceID": deviceId, "error": err})
 				ch <- status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId)
 			}
@@ -671,7 +986,7 @@
 	return nil
 }
 
-func (agent *LogicalDeviceAgent) deleteDeviceFlowsAndGroups(deviceRules *fu.DeviceRules) error {
+func (agent *LogicalDeviceAgent) deleteDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("deleteDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
 
 	chnlsList := make([]chan interface{}, 0)
@@ -679,7 +994,7 @@
 		ch := make(chan interface{})
 		chnlsList = append(chnlsList, ch)
 		go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
-			if err := agent.deviceMgr.deleteFlowsAndGroups(deviceId, flows, groups); err != nil {
+			if err := agent.deviceMgr.deleteFlowsAndGroups(deviceId, flows, groups, flowMetadata); err != nil {
 				log.Error("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err})
 				ch <- status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId)
 			}
@@ -693,7 +1008,7 @@
 	return nil
 }
 
-func (agent *LogicalDeviceAgent) updateDeviceFlowsAndGroups(deviceRules *fu.DeviceRules) error {
+func (agent *LogicalDeviceAgent) updateDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("updateDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
 
 	chnlsList := make([]chan interface{}, 0)
@@ -701,7 +1016,7 @@
 		ch := make(chan interface{})
 		chnlsList = append(chnlsList, ch)
 		go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
-			if err := agent.deviceMgr.updateFlowsAndGroups(deviceId, flows, groups); err != nil {
+			if err := agent.deviceMgr.updateFlowsAndGroups(deviceId, flows, groups, flowMetadata); err != nil {
 				log.Error("flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
 				ch <- status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId)
 			}
@@ -730,22 +1045,50 @@
 		log.Errorw("no-logical-device-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 		return errors.New(fmt.Sprintf("no-logical-device-present:%s", agent.logicalDeviceId))
 	}
-	flows := lDevice.Flows.Items
-	changed := false
+	var meters []*ofp.OfpMeterEntry
+	var flows []*ofp.OfpFlowStats
+	if lDevice.Meters != nil && lDevice.Meters.Items != nil {
+		meters = lDevice.Meters.Items
+	}
+	if lDevice.Flows != nil && lDevice.Flows.Items != nil {
+		flows = lDevice.Flows.Items
+	}
+
+	changedFlow := false
+	changedMeter := false
 	flow := fu.FlowStatsEntryFromFlowModMessage(mod)
+	flowsToDelete := make([]*ofp.OfpFlowStats, 0)
 	idx := fu.FindFlows(flows, flow)
 	if idx >= 0 {
+		changedMeter = agent.updateFlowCountOfMeterStats(mod, meters, flow)
+		flowsToDelete = append(flowsToDelete, flows[idx])
 		flows = append(flows[:idx], flows[idx+1:]...)
-		changed = true
+		changedFlow = true
 	} else {
 		return errors.New(fmt.Sprintf("Cannot delete flow - %s", flow))
 	}
+	if changedMeter {
+		//Update model
+		metersToUpdate := &ofp.Meters{}
+		if lDevice.Meters != nil {
+			metersToUpdate = &ofp.Meters{Items: meters}
+		}
+		if err := agent.updateLogicalDeviceMetersWithoutLock(metersToUpdate); err != nil {
+			log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+			return err
+		}
 
-	if changed {
-		deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: []*ofp.OfpFlowStats{flow}}, ofp.FlowGroups{})
+	}
+	if changedFlow {
+		var flowMetadata voltha.FlowMetadata
+		if err := agent.GetMeterConfig(flowsToDelete, meters, &flowMetadata); err != nil {
+			log.Error("Meter-referred-in-flows-not-present")
+			return err
+		}
+		deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: flowsToDelete}, ofp.FlowGroups{})
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.deleteDeviceFlowsAndGroups(deviceRules); err != nil {
+		if err := agent.deleteDeviceFlowsAndGroups(deviceRules, &flowMetadata); err != nil {
 			log.Errorw("failure-deleting-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
 			return err
 		}
@@ -788,8 +1131,7 @@
 
 		deviceRules := agent.flowDecomposer.DecomposeRules(agent, *lDevice.Flows, ofp.FlowGroups{Items: groups})
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-
-		if err := agent.addDeviceFlowsAndGroups(deviceRules); err != nil {
+		if err := agent.addDeviceFlowsAndGroups(deviceRules, nil); err != nil {
 			log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
 			return err
 		}
@@ -841,7 +1183,7 @@
 		deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: flows}, ofp.FlowGroups{Items: groups})
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.updateDeviceFlowsAndGroups(deviceRules); err != nil {
+		if err := agent.updateDeviceFlowsAndGroups(deviceRules, nil); err != nil {
 			log.Errorw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
 			return err
 		}
@@ -891,7 +1233,7 @@
 		deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: lDevice.Flows.Items}, ofp.FlowGroups{Items: groups})
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.updateDeviceFlowsAndGroups(deviceRules); err != nil {
+		if err := agent.updateDeviceFlowsAndGroups(deviceRules, nil); err != nil {
 			log.Errorw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
 			return err
 		}