VOL-1623-meter support and handling  techprofile and fix for flow delete , now migrated to onosproject/onos:1.13.9-rc4

Change in flowupdate API towards adapters

Remove meter_get API from adapter to core

Added dependent vendor library files downloaded  by "dep-ensure -update"

Added techprofile changes in the single commit

Review comments are addressed

submiting patch for  integration tests for meter changes and modifications in unit test for updated flow decomposer logic
  - submitting on behalf of "Salman.Siddiqui@radisys.com"

Load test for meter updated and other flow management test cases with meter
- Performed load test for 1K meters serially and parallely and added more TC in flow management

Rebased

Load test for meter updated and other flow management test cases with meter
- Performed load test for 1K meters serially and parallely and added more TC in flow management
- submitting on behalf of "Salman.Siddiqui@radisys.com"

pulled latest protos

verified EAPOL/DHCP/HSIA data with Edgecore OLT & TW ONT kit for one subcriber
verified delete/re-add is working end to end for the same subscriber

Change-Id: Idb232b7a0f05dc0c7e68266ac885740a3adff317
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 41f71a6..9511b9d 100755
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -418,11 +418,11 @@
 	return unPackResponse(rpc, deviceId, success, result)
 }
 
-func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
+func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id, "flowsInUpdate": len(flows.Items), "groupsToUpdate": len(groups.Items)})
 	toTopic := ap.getAdapterTopic(device.Adapter)
 	rpc := "update_flows_bulk"
-	args := make([]*kafka.KVArg, 3)
+	args := make([]*kafka.KVArg, 4)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
@@ -435,6 +435,10 @@
 		Key:   "groups",
 		Value: groups,
 	}
+	args[3] = &kafka.KVArg{
+		Key:   "flow_metadata",
+		Value: flowMetadata,
+	}
 
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
@@ -443,7 +447,7 @@
 	return unPackResponse(rpc, device.Id, success, result)
 }
 
-func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
+func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("UpdateFlowsIncremental",
 		log.Fields{
 			"deviceId":       device.Id,
@@ -455,7 +459,7 @@
 		})
 	toTopic := ap.getAdapterTopic(device.Adapter)
 	rpc := "update_flows_incrementally"
-	args := make([]*kafka.KVArg, 3)
+	args := make([]*kafka.KVArg, 4)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
@@ -469,6 +473,10 @@
 		Value: groupChanges,
 	}
 
+	args[3] = &kafka.KVArg{
+		Key:   "flow_metadata",
+		Value: flowMetadata,
+	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 7b9e00b..a61ca25 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -215,16 +215,16 @@
 	ch <- nil
 }
 
-func (agent *DeviceAgent) sendBulkFlowsToAdapters(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, ch chan interface{}) {
-	if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, groups); err != nil {
+func (agent *DeviceAgent) sendBulkFlowsToAdapters(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata, ch chan interface{}) {
+	if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, groups, flowMetadata); err != nil {
 		log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
 		ch <- err
 	}
 	ch <- nil
 }
 
-func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, ch chan interface{}) {
-	if err := agent.adapterProxy.UpdateFlowsIncremental(device, flows, groups); err != nil {
+func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata, ch chan interface{}) {
+	if err := agent.adapterProxy.UpdateFlowsIncremental(device, flows, groups, flowMetadata); err != nil {
 		log.Debugw("update-flow-incremental-error", log.Fields{"id": agent.lastData.Id, "error": err})
 		ch <- err
 	}
@@ -233,8 +233,8 @@
 
 //addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
 //adapters
-func (agent *DeviceAgent) addFlowsAndGroups(newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry) error {
-	log.Debugw("addFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+func (agent *DeviceAgent) addFlowsAndGroups(newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+	log.Debugw("addFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups, "flowMetadata": flowMetadata})
 
 	if (len(newFlows) | len(newGroups)) == 0 {
 		log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
@@ -302,7 +302,7 @@
 			log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
 			return nil
 		}
-		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, chAdapters)
+		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, flowMetadata, chAdapters)
 
 	} else {
 		flowChanges := &ofp.FlowChanges{
@@ -314,7 +314,7 @@
 			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
 			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 		}
-		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, chAdapters)
+		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, chAdapters)
 	}
 
 	// store the changed data
@@ -323,6 +323,7 @@
 	go agent.updateDeviceWithoutLockAsync(device, chdB)
 
 	if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters, chdB); res != nil {
+		log.Debugw("Failed to get response from adapter[or] DB", log.Fields{"result": res})
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 
@@ -331,7 +332,7 @@
 
 //deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the
 //adapters
-func (agent *DeviceAgent) deleteFlowsAndGroups(flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry) error {
+func (agent *DeviceAgent) deleteFlowsAndGroups(flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("deleteFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": flowsToDel, "groups": groupsToDel})
 
 	if (len(flowsToDel) | len(groupsToDel)) == 0 {
@@ -393,7 +394,7 @@
 			log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
 			return nil
 		}
-		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, chAdapters)
+		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata, chAdapters)
 	} else {
 		flowChanges := &ofp.FlowChanges{
 			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -404,7 +405,7 @@
 			ToRemove: &voltha.FlowGroups{Items: groupsToDel},
 			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 		}
-		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, chAdapters)
+		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, chAdapters)
 	}
 
 	// store the changed data
@@ -421,7 +422,7 @@
 
 //updateFlowsAndGroups replaces the existing flows and groups with "updatedFlows" and "updatedGroups" respectively. It
 //also sends the updates to the adapters
-func (agent *DeviceAgent) updateFlowsAndGroups(updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry) error {
+func (agent *DeviceAgent) updateFlowsAndGroups(updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("updateFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": updatedFlows, "groups": updatedGroups})
 
 	if (len(updatedFlows) | len(updatedGroups)) == 0 {
@@ -457,7 +458,7 @@
 
 	// Process bulk flow update differently than incremental update
 	if !dType.AcceptsAddRemoveFlowUpdates {
-		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, chAdapters)
+		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil, chAdapters)
 	} else {
 		var flowsToAdd []*ofp.OfpFlowStats
 		var flowsToDelete []*ofp.OfpFlowStats
@@ -512,7 +513,7 @@
 			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
 			ToUpdate: &voltha.FlowGroups{Items: updatedGroups},
 		}
-		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, chAdapters)
+		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, chAdapters)
 	}
 
 	// store the updated data
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 606e36f..f417b05 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -671,26 +671,26 @@
 	return status.Errorf(codes.NotFound, "%s", deviceId)
 }
 
-func (dMgr *DeviceManager) addFlowsAndGroups(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) error {
-	log.Debugw("addFlowsAndGroups", log.Fields{"deviceid": deviceId})
+func (dMgr *DeviceManager) addFlowsAndGroups(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+	log.Debugw("addFlowsAndGroups", log.Fields{"deviceid": deviceId, "flowMetadata": flowMetadata})
 	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
-		return agent.addFlowsAndGroups(flows, groups)
+		return agent.addFlowsAndGroups(flows, groups, flowMetadata)
 	}
 	return status.Errorf(codes.NotFound, "%s", deviceId)
 }
 
-func (dMgr *DeviceManager) deleteFlowsAndGroups(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) error {
+func (dMgr *DeviceManager) deleteFlowsAndGroups(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("deleteFlowsAndGroups", log.Fields{"deviceid": deviceId})
 	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
-		return agent.deleteFlowsAndGroups(flows, groups)
+		return agent.deleteFlowsAndGroups(flows, groups, flowMetadata)
 	}
 	return status.Errorf(codes.NotFound, "%s", deviceId)
 }
 
-func (dMgr *DeviceManager) updateFlowsAndGroups(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) error {
+func (dMgr *DeviceManager) updateFlowsAndGroups(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("updateFlowsAndGroups", log.Fields{"deviceid": deviceId})
 	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
-		return agent.updateFlowsAndGroups(flows, groups)
+		return agent.updateFlowsAndGroups(flows, groups, flowMetadata)
 	}
 	return status.Errorf(codes.NotFound, "%s", deviceId)
 }
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index f2c16e7..bdb3e39 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -943,13 +943,17 @@
 	return nil, nil
 }
 
-//@TODO useless stub, what should this actually do?
-func (handler *APIHandler) GetMeterStatsOfLogicalDevice(
-	ctx context.Context,
-	in *common.ID,
-) (*openflow_13.MeterStatsReply, error) {
-	log.Debug("GetMeterStatsOfLogicalDevice-stub")
-	return nil, nil
+func (handler *APIHandler) ListLogicalDeviceMeters(ctx context.Context, id *voltha.ID) (*openflow_13.Meters, error) {
+
+	log.Debugw("ListLogicalDeviceMeters", log.Fields{"id": *id})
+	if handler.competeForTransaction() {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
+			return nil, err // TODO: Return empty meter entry
+		} else {
+			defer txn.Close()
+		}
+	}
+	return handler.logicalDeviceMgr.ListLogicalDeviceMeters(ctx, id.Id)
 }
 
 //@TODO useless stub, what should this actually do?
@@ -986,11 +990,25 @@
 	return successResp, nil
 }
 
-//@TODO useless stub, what should this actually do?
-func (handler *APIHandler) UpdateLogicalDeviceMeterTable(
-	ctx context.Context,
-	in *openflow_13.MeterModUpdate,
-) (*empty.Empty, error) {
-	log.Debug("UpdateLogicalDeviceMeterTable-stub")
-	return nil, nil
+// This function sends meter mod request to logical device manager and waits for response
+func (handler *APIHandler) UpdateLogicalDeviceMeterTable(ctx context.Context, meter *openflow_13.MeterModUpdate) (*empty.Empty, error) {
+	log.Debugw("UpdateLogicalDeviceMeterTable-request",
+		log.Fields{"meter": meter, "test": common.TestModeKeys_api_test.String()})
+	if isTestMode(ctx) {
+		out := new(empty.Empty)
+		return out, nil
+	}
+
+	if handler.competeForTransaction() {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: meter.Id}); err != nil {
+			return new(empty.Empty), err
+		} else {
+			defer txn.Close()
+		}
+	}
+
+	ch := make(chan interface{})
+	defer close(ch)
+	go handler.logicalDeviceMgr.updateMeterTable(ctx, meter.Id, meter.MeterMod, ch)
+	return waitForNilResponseOnSuccess(ctx, ch)
 }
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 70349d8..49e1463 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -45,6 +45,7 @@
 	deviceGraph        *graph.DeviceGraph
 	flowProxy          *model.Proxy
 	groupProxy         *model.Proxy
+	meterProxy         *model.Proxy
 	ldProxy            *model.Proxy
 	portProxies        map[string]*model.Proxy
 	portProxiesLock    sync.RWMutex
@@ -139,6 +140,10 @@
 		ctx,
 		fmt.Sprintf("/logical_devices/%s/flows", agent.logicalDeviceId),
 		false)
+	agent.meterProxy = agent.clusterDataProxy.CreateProxy(
+		ctx,
+		fmt.Sprintf("/logical_devices/%s/meters", agent.logicalDeviceId),
+		false)
 	agent.groupProxy = agent.clusterDataProxy.CreateProxy(
 		ctx,
 		fmt.Sprintf("/logical_devices/%s/flow_groups", agent.logicalDeviceId),
@@ -199,6 +204,18 @@
 	return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
 }
 
+func (agent *LogicalDeviceAgent) ListLogicalDeviceMeters() (*ofp.Meters, error) {
+	log.Debug("ListLogicalDeviceMeters")
+	agent.lockLogicalDevice.RLock()
+	defer agent.lockLogicalDevice.RUnlock()
+	logicalDevice := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceId, 0, false, "")
+	if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
+		cMeters := (proto.Clone(lDevice.Meters)).(*ofp.Meters)
+		return cMeters, nil
+	}
+	return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
+}
+
 func (agent *LogicalDeviceAgent) ListLogicalDeviceFlowGroups() (*ofp.FlowGroups, error) {
 	log.Debug("ListLogicalDeviceFlowGroups")
 	agent.lockLogicalDevice.RLock()
@@ -261,6 +278,16 @@
 }
 
 //updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
+func (agent *LogicalDeviceAgent) updateLogicalDeviceMetersWithoutLock(meters *ofp.Meters) error {
+	updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+	afterUpdate := agent.meterProxy.Update(updateCtx, "/", meters, false, "")
+	if afterUpdate == nil {
+		return status.Errorf(codes.Internal, "failed-updating-logical-device-meters:%s", agent.logicalDeviceId)
+	}
+	return nil
+}
+
+//updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
 func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowGroupsWithoutLock(flowGroups *ofp.FlowGroups) error {
 	updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
 	afterUpdate := agent.groupProxy.Update(updateCtx, "/", flowGroups, false, "")
@@ -520,6 +547,222 @@
 		"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceId, groupMod.GetCommand())
 }
 
+// updateMeterTable updates the meter table of that logical device
+func (agent *LogicalDeviceAgent) updateMeterTable(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
+	log.Debug("updateMeterTable")
+	if meterMod == nil {
+		return nil
+	}
+	switch meterMod.GetCommand() {
+	case ofp.OfpMeterModCommand_OFPMC_ADD:
+		return agent.meterAdd(meterMod)
+	case ofp.OfpMeterModCommand_OFPMC_DELETE:
+		return agent.meterDelete(meterMod)
+	case ofp.OfpMeterModCommand_OFPMC_MODIFY:
+		return agent.meterModify(meterMod)
+	}
+	return status.Errorf(codes.Internal,
+		"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceId, meterMod.GetCommand())
+
+}
+
+func (agent *LogicalDeviceAgent) meterAdd(meterMod *ofp.OfpMeterMod) error {
+	log.Debugw("meterAdd", log.Fields{"metermod": *meterMod})
+	if meterMod == nil {
+		return nil
+	}
+	log.Debug("Waiting for logical device lock!!")
+	agent.lockLogicalDevice.Lock()
+	defer agent.lockLogicalDevice.Unlock()
+	log.Debug("Acquired logical device lock")
+	var lDevice *voltha.LogicalDevice
+	var err error
+	if lDevice, err = agent.getLogicalDeviceWithoutLock(); err != nil {
+		log.Errorw("no-logical-device-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+		return errors.New(fmt.Sprintf("no-logical-device-present:%s", agent.logicalDeviceId))
+	}
+
+	var meters []*ofp.OfpMeterEntry
+	if lDevice.Meters != nil && lDevice.Meters.Items != nil {
+		meters = lDevice.Meters.Items
+	}
+	log.Debugw("Available meters", log.Fields{"meters": meters})
+
+	for _, meter := range meters {
+		if meterMod.MeterId == meter.Config.MeterId {
+			log.Infow("Meter-already-exists", log.Fields{"meter": *meterMod})
+			return nil
+		}
+	}
+
+	meterEntry := fu.MeterEntryFromMeterMod(meterMod)
+	meters = append(meters, meterEntry)
+	//Update model
+	if err := agent.updateLogicalDeviceMetersWithoutLock(&ofp.Meters{Items: meters}); err != nil {
+		log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+		return err
+	}
+	log.Debugw("Meter-added-successfully", log.Fields{"Added-meter": meterEntry, "updated-meters": lDevice.Meters})
+	return nil
+}
+
+func (agent *LogicalDeviceAgent) meterDelete(meterMod *ofp.OfpMeterMod) error {
+	log.Debug("meterDelete", log.Fields{"meterMod": *meterMod})
+	if meterMod == nil {
+		return nil
+	}
+	agent.lockLogicalDevice.Lock()
+	defer agent.lockLogicalDevice.Unlock()
+
+	var lDevice *voltha.LogicalDevice
+	var err error
+	if lDevice, err = agent.getLogicalDeviceWithoutLock(); err != nil {
+		log.Errorw("no-logical-device-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+		return errors.New(fmt.Sprintf("no-logical-device-present:%s", agent.logicalDeviceId))
+	}
+
+	var meters []*ofp.OfpMeterEntry
+	var flows []*ofp.OfpFlowStats
+	updatedFlows := make([]*ofp.OfpFlowStats, 0)
+	if lDevice.Meters != nil && lDevice.Meters.Items != nil {
+		meters = lDevice.Meters.Items
+	}
+	if lDevice.Flows != nil && lDevice.Flows.Items != nil {
+		flows = lDevice.Flows.Items
+	}
+
+	changedMeter := false
+	changedFow := false
+	log.Debugw("Available meters", log.Fields{"meters": meters})
+	for index, meter := range meters {
+		if meterMod.MeterId == meter.Config.MeterId {
+			flows = lDevice.Flows.Items
+			changedFow, updatedFlows = agent.getUpdatedFlowsAfterDeletebyMeterId(flows, meterMod.MeterId)
+			meters = append(meters[:index], meters[index+1:]...)
+			log.Debugw("Meter has been deleted", log.Fields{"meter": meter, "index": index})
+			changedMeter = true
+			break
+		}
+	}
+	if changedMeter {
+		//Update model
+		metersToUpdate := &ofp.Meters{}
+		if lDevice.Meters != nil {
+			metersToUpdate = &ofp.Meters{Items: meters}
+		}
+		if err := agent.updateLogicalDeviceMetersWithoutLock(metersToUpdate); err != nil {
+			log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+			return err
+		}
+		log.Debug("Meter-deleted-from-DB-successfully", log.Fields{"updatedMeters": metersToUpdate, "no-of-meter": len(metersToUpdate.Items)})
+
+	}
+	if changedFow {
+		//Update model
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: updatedFlows}); err != nil {
+			log.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+			return err
+		}
+		log.Debug("Flows-associated-with-meter-deleted-from-DB-successfully",
+			log.Fields{"updated-no-of-flows": len(updatedFlows), "meter": meterMod.MeterId})
+	}
+	log.Debugw("meterDelete success", log.Fields{"meterID": meterMod.MeterId})
+	return nil
+}
+
+func (agent *LogicalDeviceAgent) meterModify(meterMod *ofp.OfpMeterMod) error {
+	log.Debug("meterModify")
+	if meterMod == nil {
+		return nil
+	}
+	agent.lockLogicalDevice.Lock()
+	defer agent.lockLogicalDevice.Unlock()
+
+	var lDevice *voltha.LogicalDevice
+	var err error
+	if lDevice, err = agent.getLogicalDeviceWithoutLock(); err != nil {
+		log.Errorw("no-logical-device-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+		return errors.New(fmt.Sprintf("no-logical-device-present:%s", agent.logicalDeviceId))
+	}
+
+	var meters []*ofp.OfpMeterEntry
+	if lDevice.Meters != nil && lDevice.Meters.Items != nil {
+		meters = lDevice.Meters.Items
+	}
+	changedMeter := false
+	for index, meter := range meters {
+		if meterMod.MeterId == meter.Config.MeterId {
+			newmeterEntry := fu.MeterEntryFromMeterMod(meterMod)
+			newmeterEntry.Stats.FlowCount = meter.Stats.FlowCount
+			meters[index] = newmeterEntry
+			changedMeter = true
+			log.Debugw("Found meter, replaced with new meter", log.Fields{"old meter": meter, "new meter": newmeterEntry})
+			break
+		}
+	}
+	if changedMeter {
+		//Update model
+		metersToUpdate := &ofp.Meters{}
+		if lDevice.Meters != nil {
+			metersToUpdate = &ofp.Meters{Items: meters}
+		}
+		if err := agent.updateLogicalDeviceMetersWithoutLock(metersToUpdate); err != nil {
+			log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+			return err
+		}
+		log.Debugw("meter-updated-in-DB-successfully", log.Fields{"updated_meters": meters})
+		return nil
+	}
+
+	log.Errorw("Meter not found ", log.Fields{"meter": meterMod})
+	return errors.New(fmt.Sprintf("no-logical-device-present:%d", meterMod.MeterId))
+
+}
+
+func (agent *LogicalDeviceAgent) getUpdatedFlowsAfterDeletebyMeterId(flows []*ofp.OfpFlowStats, meterId uint32) (bool, []*ofp.OfpFlowStats) {
+	log.Infow("Delete flows matching meter", log.Fields{"meter": meterId})
+	changed := false
+	//updatedFlows := make([]*ofp.OfpFlowStats, 0)
+	for index := len(flows) - 1; index >= 0; index-- {
+		if mId := fu.GetMeterIdFromFlow(flows[index]); mId != 0 && mId == meterId {
+			log.Debugw("Flow to be deleted", log.Fields{"flow": flows[index], "index": index})
+			flows = append(flows[:index], flows[index+1:]...)
+			changed = true
+		}
+	}
+	return changed, flows
+}
+
+func (agent *LogicalDeviceAgent) updateFlowCountOfMeterStats(modCommand *ofp.OfpFlowMod, meters []*ofp.OfpMeterEntry, flow *ofp.OfpFlowStats) bool {
+
+	flowCommand := modCommand.GetCommand()
+	meterId := fu.GetMeterIdFromFlow(flow)
+	log.Debugw("Meter-id-in-flow-mod", log.Fields{"meterId": meterId})
+	if meterId == 0 {
+		log.Debugw("No meter present in the flow", log.Fields{"flow": *flow})
+		return false
+	}
+	if meters == nil {
+		log.Debug("No meters present in logical device")
+		return false
+	}
+	changedMeter := false
+	for _, meter := range meters {
+		if meterId == meter.Config.MeterId { // Found meter in Logicaldevice
+			if flowCommand == ofp.OfpFlowModCommand_OFPFC_ADD {
+				meter.Stats.FlowCount += 1
+				changedMeter = true
+			} else if flowCommand == ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT {
+				meter.Stats.FlowCount -= 1
+				changedMeter = true
+			}
+			log.Debugw("Found meter, updated meter flow stats", log.Fields{" meterId": meterId})
+			break
+		}
+	}
+	return changedMeter
+}
+
 //flowAdd adds a flow to the flow table of that logical device
 func (agent *LogicalDeviceAgent) flowAdd(mod *ofp.OfpFlowMod) error {
 	log.Debug("flowAdd")
@@ -537,12 +780,19 @@
 	}
 
 	var flows []*ofp.OfpFlowStats
+	var meters []*ofp.OfpMeterEntry
+	var flow *ofp.OfpFlowStats
+
 	if lDevice.Flows != nil && lDevice.Flows.Items != nil {
 		flows = lDevice.Flows.Items
 	}
 
+	if lDevice.Meters != nil && lDevice.Meters.Items != nil {
+		meters = lDevice.Meters.Items
+	}
 	updatedFlows := make([]*ofp.OfpFlowStats, 0)
 	changed := false
+	updated := false
 	checkOverlap := (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_CHECK_OVERLAP)) != 0
 	if checkOverlap {
 		if overlapped := fu.FindOverlappingFlows(flows, mod); len(overlapped) != 0 {
@@ -550,13 +800,13 @@
 			log.Warnw("overlapped-flows", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
 		} else {
 			//	Add flow
-			flow := fu.FlowStatsEntryFromFlowModMessage(mod)
+			flow = fu.FlowStatsEntryFromFlowModMessage(mod)
 			flows = append(flows, flow)
 			updatedFlows = append(updatedFlows, flow)
 			changed = true
 		}
 	} else {
-		flow := fu.FlowStatsEntryFromFlowModMessage(mod)
+		flow = fu.FlowStatsEntryFromFlowModMessage(mod)
 		idx := fu.FindFlows(flows, flow)
 		if idx >= 0 {
 			oldFlow := flows[idx]
@@ -568,6 +818,7 @@
 				flows[idx] = flow
 				updatedFlows = append(updatedFlows, flow)
 				changed = true
+				updated = true
 			}
 		} else {
 			flows = append(flows, flow)
@@ -576,10 +827,15 @@
 		}
 	}
 	if changed {
+		var flowMetadata voltha.FlowMetadata
+		if err := agent.GetMeterConfig(updatedFlows, meters, &flowMetadata); err != nil { // This should never happen,meters should be installed before flow arrives
+			log.Error("Meter-referred-in-flows-not-present")
+			return err
+		}
 		deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: updatedFlows}, *lDevice.FlowGroups)
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.addDeviceFlowsAndGroups(deviceRules); err != nil {
+		if err := agent.addDeviceFlowsAndGroups(deviceRules, &flowMetadata); err != nil {
 			log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
 			return err
 		}
@@ -589,10 +845,55 @@
 			log.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
+		if !updated {
+			changedMeterStats := agent.updateFlowCountOfMeterStats(mod, meters, flow)
+			metersToUpdate := &ofp.Meters{}
+			if lDevice.Meters != nil {
+				metersToUpdate = &ofp.Meters{Items: meters}
+			}
+			if changedMeterStats {
+				//Update model
+				if err := agent.updateLogicalDeviceMetersWithoutLock(metersToUpdate); err != nil {
+					log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+					return err
+				}
+				log.Debugw("meter-stats-updated-in-DB-successfully", log.Fields{"updated_meters": meters})
+
+			}
+		}
+
 	}
 	return nil
 }
 
+func (agent *LogicalDeviceAgent) GetMeterConfig(flows []*ofp.OfpFlowStats, meters []*ofp.OfpMeterEntry, metadata *voltha.FlowMetadata) error {
+	m := make(map[uint32]bool)
+	for _, flow := range flows {
+		if flowMeterID := fu.GetMeterIdFromFlow(flow); flowMeterID != 0 && m[flowMeterID] == false {
+			foundMeter := false
+			// Meter is present in the flow , Get from logical device
+			for _, meter := range meters {
+				if flowMeterID == meter.Config.MeterId {
+					metadata.Meters = append(metadata.Meters, meter.Config)
+					log.Debugw("Found meter in logical device",
+						log.Fields{"meterID": flowMeterID, "meter-band": meter.Config})
+					m[flowMeterID] = true
+					foundMeter = true
+					break
+				}
+			}
+			if !foundMeter {
+				log.Errorw("Meter-referred-by-flow-is-not-found-in-logicaldevice",
+					log.Fields{"meterID": flowMeterID, "Avaliable-meters": meters, "flow": *flow})
+				return errors.New("Meter-referred-by-flow-is-not-found-in-logicaldevice")
+			}
+		}
+	}
+	log.Debugw("meter-bands-for-flows", log.Fields{"flows": len(flows), "metadata": metadata})
+	return nil
+
+}
+
 //flowDelete deletes a flow from the flow table of that logical device
 func (agent *LogicalDeviceAgent) flowDelete(mod *ofp.OfpFlowMod) error {
 	log.Debug("flowDelete")
@@ -608,8 +909,17 @@
 		log.Errorw("no-logical-device-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 		return errors.New(fmt.Sprintf("no-logical-device-present:%s", agent.logicalDeviceId))
 	}
-	flows := lDevice.Flows.Items
 
+	var meters []*ofp.OfpMeterEntry
+	var flows []*ofp.OfpFlowStats
+
+	if lDevice.Flows != nil && lDevice.Flows.Items != nil {
+		flows = lDevice.Flows.Items
+	}
+
+	if lDevice.Meters != nil && lDevice.Meters.Items != nil {
+		meters = lDevice.Meters.Items
+	}
 	//build a list of what to keep vs what to delete
 	toKeep := make([]*ofp.OfpFlowStats, 0)
 	toDelete := make([]*ofp.OfpFlowStats, 0)
@@ -631,10 +941,15 @@
 
 	//Update flows
 	if len(toDelete) > 0 {
+		var flowMetadata voltha.FlowMetadata
+		if err := agent.GetMeterConfig(toDelete, meters, &flowMetadata); err != nil { // This should never happen
+			log.Error("Meter-referred-in-flows-not-present")
+			return errors.New("Meter-referred-in-flows-not-present")
+		}
 		deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: toDelete}, ofp.FlowGroups{})
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.deleteDeviceFlowsAndGroups(deviceRules); err != nil {
+		if err := agent.deleteDeviceFlowsAndGroups(deviceRules, &flowMetadata); err != nil {
 			log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
 			return err
 		}
@@ -649,15 +964,15 @@
 	return nil
 }
 
-func (agent *LogicalDeviceAgent) addDeviceFlowsAndGroups(deviceRules *fu.DeviceRules) error {
-	log.Debugw("addDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
+func (agent *LogicalDeviceAgent) addDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
+	log.Debugw("addDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
 
 	chnlsList := make([]chan interface{}, 0)
 	for deviceId, value := range deviceRules.GetRules() {
 		ch := make(chan interface{})
 		chnlsList = append(chnlsList, ch)
 		go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
-			if err := agent.deviceMgr.addFlowsAndGroups(deviceId, flows, groups); err != nil {
+			if err := agent.deviceMgr.addFlowsAndGroups(deviceId, flows, groups, flowMetadata); err != nil {
 				log.Errorw("flow-add-failed", log.Fields{"deviceID": deviceId, "error": err})
 				ch <- status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId)
 			}
@@ -671,7 +986,7 @@
 	return nil
 }
 
-func (agent *LogicalDeviceAgent) deleteDeviceFlowsAndGroups(deviceRules *fu.DeviceRules) error {
+func (agent *LogicalDeviceAgent) deleteDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("deleteDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
 
 	chnlsList := make([]chan interface{}, 0)
@@ -679,7 +994,7 @@
 		ch := make(chan interface{})
 		chnlsList = append(chnlsList, ch)
 		go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
-			if err := agent.deviceMgr.deleteFlowsAndGroups(deviceId, flows, groups); err != nil {
+			if err := agent.deviceMgr.deleteFlowsAndGroups(deviceId, flows, groups, flowMetadata); err != nil {
 				log.Error("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err})
 				ch <- status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId)
 			}
@@ -693,7 +1008,7 @@
 	return nil
 }
 
-func (agent *LogicalDeviceAgent) updateDeviceFlowsAndGroups(deviceRules *fu.DeviceRules) error {
+func (agent *LogicalDeviceAgent) updateDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("updateDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
 
 	chnlsList := make([]chan interface{}, 0)
@@ -701,7 +1016,7 @@
 		ch := make(chan interface{})
 		chnlsList = append(chnlsList, ch)
 		go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
-			if err := agent.deviceMgr.updateFlowsAndGroups(deviceId, flows, groups); err != nil {
+			if err := agent.deviceMgr.updateFlowsAndGroups(deviceId, flows, groups, flowMetadata); err != nil {
 				log.Error("flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
 				ch <- status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId)
 			}
@@ -730,22 +1045,50 @@
 		log.Errorw("no-logical-device-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 		return errors.New(fmt.Sprintf("no-logical-device-present:%s", agent.logicalDeviceId))
 	}
-	flows := lDevice.Flows.Items
-	changed := false
+	var meters []*ofp.OfpMeterEntry
+	var flows []*ofp.OfpFlowStats
+	if lDevice.Meters != nil && lDevice.Meters.Items != nil {
+		meters = lDevice.Meters.Items
+	}
+	if lDevice.Flows != nil && lDevice.Flows.Items != nil {
+		flows = lDevice.Flows.Items
+	}
+
+	changedFlow := false
+	changedMeter := false
 	flow := fu.FlowStatsEntryFromFlowModMessage(mod)
+	flowsToDelete := make([]*ofp.OfpFlowStats, 0)
 	idx := fu.FindFlows(flows, flow)
 	if idx >= 0 {
+		changedMeter = agent.updateFlowCountOfMeterStats(mod, meters, flow)
+		flowsToDelete = append(flowsToDelete, flows[idx])
 		flows = append(flows[:idx], flows[idx+1:]...)
-		changed = true
+		changedFlow = true
 	} else {
 		return errors.New(fmt.Sprintf("Cannot delete flow - %s", flow))
 	}
+	if changedMeter {
+		//Update model
+		metersToUpdate := &ofp.Meters{}
+		if lDevice.Meters != nil {
+			metersToUpdate = &ofp.Meters{Items: meters}
+		}
+		if err := agent.updateLogicalDeviceMetersWithoutLock(metersToUpdate); err != nil {
+			log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+			return err
+		}
 
-	if changed {
-		deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: []*ofp.OfpFlowStats{flow}}, ofp.FlowGroups{})
+	}
+	if changedFlow {
+		var flowMetadata voltha.FlowMetadata
+		if err := agent.GetMeterConfig(flowsToDelete, meters, &flowMetadata); err != nil {
+			log.Error("Meter-referred-in-flows-not-present")
+			return err
+		}
+		deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: flowsToDelete}, ofp.FlowGroups{})
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.deleteDeviceFlowsAndGroups(deviceRules); err != nil {
+		if err := agent.deleteDeviceFlowsAndGroups(deviceRules, &flowMetadata); err != nil {
 			log.Errorw("failure-deleting-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
 			return err
 		}
@@ -788,8 +1131,7 @@
 
 		deviceRules := agent.flowDecomposer.DecomposeRules(agent, *lDevice.Flows, ofp.FlowGroups{Items: groups})
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-
-		if err := agent.addDeviceFlowsAndGroups(deviceRules); err != nil {
+		if err := agent.addDeviceFlowsAndGroups(deviceRules, nil); err != nil {
 			log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
 			return err
 		}
@@ -841,7 +1183,7 @@
 		deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: flows}, ofp.FlowGroups{Items: groups})
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.updateDeviceFlowsAndGroups(deviceRules); err != nil {
+		if err := agent.updateDeviceFlowsAndGroups(deviceRules, nil); err != nil {
 			log.Errorw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
 			return err
 		}
@@ -891,7 +1233,7 @@
 		deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: lDevice.Flows.Items}, ofp.FlowGroups{Items: groups})
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.updateDeviceFlowsAndGroups(deviceRules); err != nil {
+		if err := agent.updateDeviceFlowsAndGroups(deviceRules, nil); err != nil {
 			log.Errorw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
 			return err
 		}
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index a097736..71843ff 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -476,6 +476,25 @@
 	sendAPIResponse(ctx, ch, res)
 }
 
+func (ldMgr *LogicalDeviceManager) updateMeterTable(ctx context.Context, id string, meter *openflow_13.OfpMeterMod, ch chan interface{}) {
+	log.Debugw("updateMeterTable", log.Fields{"logicalDeviceId": id})
+	var res interface{}
+	if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+		res = agent.updateMeterTable(ctx, meter)
+		log.Debugw("updateMeterTable-result", log.Fields{"result": res})
+	} else {
+		res = status.Errorf(codes.NotFound, "%s", id)
+	}
+	sendAPIResponse(ctx, ch, res)
+}
+
+func (ldMgr *LogicalDeviceManager) ListLogicalDeviceMeters(ctx context.Context, id string) (*openflow_13.Meters, error) {
+	log.Debugw("ListLogicalDeviceMeters", log.Fields{"logicalDeviceId": id})
+	if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+		return agent.ListLogicalDeviceMeters()
+	}
+	return nil, status.Errorf(codes.NotFound, "%s", id)
+}
 func (ldMgr *LogicalDeviceManager) updateGroupTable(ctx context.Context, id string, groupMod *openflow_13.OfpGroupMod, ch chan interface{}) {
 	log.Debugw("updateGroupTable", log.Fields{"logicalDeviceId": id})
 	var res interface{}
diff --git a/rw_core/flow_decomposition/flow_decomposer.go b/rw_core/flow_decomposition/flow_decomposer.go
index 98d5092..09b29e8 100644
--- a/rw_core/flow_decomposition/flow_decomposer.go
+++ b/rw_core/flow_decomposition/flow_decomposer.go
@@ -105,6 +105,8 @@
 
 	log.Debugw("trap-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo, "flow": flow})
 	deviceRules := fu.NewDeviceRules()
+	meterId := fu.GetMeterIdFromFlow(flow)
+	metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(flow)
 
 	egressHop := route[1]
 
@@ -128,10 +130,9 @@
 			var fa *fu.FlowArgs
 			// Upstream flow
 			fa = &fu.FlowArgs{
-				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
+				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterId), "write_metadata": metadataFromwriteMetadata},
 				MatchFields: []*ofp.OfpOxmOfbField{
 					fu.InPort(egressHop.Ingress),
-					fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | inputPort),
 					fu.TunnelId(uint64(inputPort)),
 				},
 				Actions: []*ofp.OfpAction{
@@ -141,24 +142,7 @@
 				},
 			}
 			// Augment the matchfields with the ofpfields from the flow
-			fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT, fu.VLAN_VID)...)
-			fg.AddFlow(fu.MkFlowStat(fa))
-
-			// Downstream flow
-			fa = &fu.FlowArgs{
-				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority)},
-				MatchFields: []*ofp.OfpOxmOfbField{
-					fu.InPort(egressHop.Egress),
-					fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000),
-					fu.VlanPcp(0),
-					fu.Metadata_ofp(uint64(inputPort)),
-					fu.TunnelId(uint64(inputPort)),
-				},
-				Actions: []*ofp.OfpAction{
-					fu.PopVlan(),
-					fu.Output(egressHop.Ingress),
-				},
-			}
+			fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
 			fg.AddFlow(fu.MkFlowStat(fa))
 		}
 	}
@@ -177,17 +161,21 @@
 	log.Debugw("upstream-non-controller-bound-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
 	deviceRules := fu.NewDeviceRules()
 
+	meterId := fu.GetMeterIdFromFlow(flow)
+	metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(flow)
+
 	ingressHop := route[0]
 	egressHop := route[1]
 
-	if fu.HasNextTable(flow) {
-		log.Debugw("has-next-table", log.Fields{"table_id": flow.TableId})
+	if flow.TableId == 0 && fu.HasNextTable(flow) {
+		log.Debugw("decomposing-onu-flow-in-upstream-has-next-table", log.Fields{"table_id": flow.TableId})
 		if outPortNo != 0 {
 			log.Warnw("outPort-should-not-be-specified", log.Fields{"outPortNo": outPortNo})
+			return deviceRules
 		}
 		var fa *fu.FlowArgs
 		fa = &fu.FlowArgs{
-			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
+			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterId), "write_metadata": metadataFromwriteMetadata},
 			MatchFields: []*ofp.OfpOxmOfbField{
 				fu.InPort(ingressHop.Ingress),
 				fu.TunnelId(uint64(inPortNo)),
@@ -203,73 +191,27 @@
 		fg := fu.NewFlowsAndGroups()
 		fg.AddFlow(fu.MkFlowStat(fa))
 		deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
-	} else {
-		var actions []ofp.OfpActionType
-		var isOutputTypeInActions bool
-		for _, action := range fu.GetActions(flow) {
-			actions = append(actions, action.Type)
-			if !isOutputTypeInActions && action.Type == fu.OUTPUT {
-				isOutputTypeInActions = true
-			}
+	} else if flow.TableId == 1 && outPortNo != 0 {
+		log.Debugw("decomposing-olt-flow-in-upstream-has-next-table", log.Fields{"table_id": flow.TableId})
+		var fa *fu.FlowArgs
+		fa = &fu.FlowArgs{
+			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterId), "write_metadata": metadataFromwriteMetadata},
+			MatchFields: []*ofp.OfpOxmOfbField{
+				fu.InPort(egressHop.Ingress),
+				fu.TunnelId(uint64(inPortNo)),
+			},
 		}
-		if len(actions) == 1 && isOutputTypeInActions {
-			var fa *fu.FlowArgs
-			// child device flow
-			fa = &fu.FlowArgs{
-				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
-				MatchFields: []*ofp.OfpOxmOfbField{
-					fu.InPort(ingressHop.Ingress),
-				},
-				Actions: []*ofp.OfpAction{
-					fu.Output(ingressHop.Egress),
-				},
-			}
-			// Augment the matchfields with the ofpfields from the flow
-			fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
-			fg := fu.NewFlowsAndGroups()
-			fg.AddFlow(fu.MkFlowStat(fa))
-			deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
+		// Augment the matchfields with the ofpfields from the flow
+		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
 
-			// parent device flow
-			fa = &fu.FlowArgs{
-				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
-				MatchFields: []*ofp.OfpOxmOfbField{
-					fu.InPort(egressHop.Ingress), //egress_hop.ingress_port.port_no
-					fu.TunnelId(uint64(inPortNo)),
-				},
-				Actions: []*ofp.OfpAction{
-					fu.Output(egressHop.Egress),
-				},
-			}
-			// Augment the matchfields with the ofpfields from the flow
-			fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
-			fg = fu.NewFlowsAndGroups()
-			fg.AddFlow(fu.MkFlowStat(fa))
-			deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
-		} else {
-			if outPortNo == 0 {
-				log.Warnw("outPort-should-be-specified", log.Fields{"outPortNo": outPortNo})
-			}
-			var fa *fu.FlowArgs
-			fa = &fu.FlowArgs{
-				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
-				MatchFields: []*ofp.OfpOxmOfbField{
-					fu.InPort(egressHop.Ingress),
-					fu.TunnelId(uint64(inPortNo)),
-				},
-			}
-			// Augment the matchfields with the ofpfields from the flow
-			fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
+		//Augment the actions
+		filteredAction := fu.GetActions(flow, fu.OUTPUT)
+		filteredAction = append(filteredAction, fu.Output(egressHop.Egress))
+		fa.Actions = filteredAction
 
-			//Augment the actions
-			filteredAction := fu.GetActions(flow, fu.OUTPUT)
-			filteredAction = append(filteredAction, fu.Output(egressHop.Egress))
-			fa.Actions = filteredAction
-
-			fg := fu.NewFlowsAndGroups()
-			fg.AddFlow(fu.MkFlowStat(fa))
-			deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
-		}
+		fg := fu.NewFlowsAndGroups()
+		fg.AddFlow(fu.MkFlowStat(fa))
+		deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
 	}
 	return deviceRules
 }
@@ -277,25 +219,32 @@
 // processDownstreamFlowWithNextTable decomposes downstream flows containing next table ID instructions
 func (fd *FlowDecomposer) processDownstreamFlowWithNextTable(agent coreIf.LogicalDeviceAgent, route []graph.RouteHop,
 	inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) *fu.DeviceRules {
-
-	log.Debugw("downstream-flow-with-next-table", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
+	log.Debugw("decomposing-olt-flow-in-downstream-flow-with-next-table", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
 	deviceRules := fu.NewDeviceRules()
+	meterId := fu.GetMeterIdFromFlow(flow)
+	metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(flow)
 
 	if outPortNo != 0 {
 		log.Warnw("outPort-should-not-be-specified", log.Fields{"outPortNo": outPortNo})
+		return deviceRules
 	}
+
+	if flow.TableId != 0 {
+		log.Warnw("This is not olt pipeline table, so skipping", log.Fields{"tableId": flow.TableId})
+		return deviceRules
+	}
+
 	ingressHop := route[0]
 	egressHop := route[1]
-
-	if fu.GetMetaData(flow) != 0 {
+	if metadataFromwriteMetadata != 0 {
 		log.Debugw("creating-metadata-flow", log.Fields{"flow": flow})
-		portNumber := uint32(fu.GetPortNumberFromMetadata(flow))
+		portNumber := fu.GetEgressPortNumberFromWriteMetadata(flow)
 		if portNumber != 0 {
 			recalculatedRoute := agent.GetRoute(inPortNo, portNumber)
 			switch len(recalculatedRoute) {
 			case 0:
-				log.Errorw("no-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": portNumber, "comment": "deleting-flow", "metadata": fu.GetMetaData64Bit(flow)})
-				//	TODO: Delete flow
+				log.Errorw("no-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": portNumber, "comment": "deleting-flow", "metadata": metadataFromwriteMetadata})
+				//TODO: Delete flow
 				return deviceRules
 			case 2:
 				log.Debugw("route-found", log.Fields{"ingressHop": ingressHop, "egressHop": egressHop})
@@ -308,16 +257,16 @@
 		}
 		innerTag := fu.GetInnerTagFromMetaData(flow)
 		if innerTag == 0 {
-			log.Errorw("no-inner-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": portNumber, "comment": "deleting-flow", "metadata": fu.GetMetaData64Bit(flow)})
-			//	TODO: Delete flow
+			log.Errorw("no-inner-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": portNumber, "comment": "deleting-flow", "metadata": metadataFromwriteMetadata})
+			//TODO: Delete flow
 			return deviceRules
 		}
 		var fa *fu.FlowArgs
 		fa = &fu.FlowArgs{
-			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
+			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterId), "write_metadata": metadataFromwriteMetadata},
 			MatchFields: []*ofp.OfpOxmOfbField{
 				fu.InPort(ingressHop.Ingress),
-				fu.Metadata_ofp(innerTag),
+				fu.Metadata_ofp(uint64(innerTag)),
 				fu.TunnelId(uint64(portNumber)),
 			},
 			Actions: fu.GetActions(flow),
@@ -335,7 +284,7 @@
 		log.Debugw("creating-standard-flow", log.Fields{"flow": flow})
 		var fa *fu.FlowArgs
 		fa = &fu.FlowArgs{
-			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
+			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterId), "write_metadata": metadataFromwriteMetadata},
 			MatchFields: []*ofp.OfpOxmOfbField{
 				fu.InPort(ingressHop.Ingress),
 				fu.TunnelId(uint64(inPortNo)),
@@ -360,76 +309,31 @@
 func (fd *FlowDecomposer) processUnicastFlow(agent coreIf.LogicalDeviceAgent, route []graph.RouteHop,
 	inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) *fu.DeviceRules {
 
-	log.Debugw("unicast-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
+	log.Debugw("decomposing-onu-flow-in-downstream-unicast-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
 	deviceRules := fu.NewDeviceRules()
 
-	ingressHop := route[0]
 	egressHop := route[1]
 
-	var actions []ofp.OfpActionType
-	var isOutputTypeInActions bool
-	for _, action := range fu.GetActions(flow) {
-		actions = append(actions, action.Type)
-		if !isOutputTypeInActions && action.Type == fu.OUTPUT {
-			isOutputTypeInActions = true
-		}
+	meterId := fu.GetMeterIdFromFlow(flow)
+	metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(flow)
+	var fa *fu.FlowArgs
+	fa = &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterId), "write_metadata": metadataFromwriteMetadata},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(egressHop.Ingress),
+		},
 	}
-	if len(actions) == 1 && isOutputTypeInActions {
-		var fa *fu.FlowArgs
-		// Parent device flow
-		fa = &fu.FlowArgs{
-			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
-			MatchFields: []*ofp.OfpOxmOfbField{
-				fu.InPort(ingressHop.Ingress),
-				fu.TunnelId(uint64(inPortNo)),
-			},
-			Actions: []*ofp.OfpAction{
-				fu.Output(ingressHop.Egress),
-			},
-		}
-		// Augment the matchfields with the ofpfields from the flow
-		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
+	// Augment the matchfields with the ofpfields from the flow
+	fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
 
-		fg := fu.NewFlowsAndGroups()
-		fg.AddFlow(fu.MkFlowStat(fa))
-		deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
+	// Augment the Actions
+	filteredAction := fu.GetActions(flow, fu.OUTPUT)
+	filteredAction = append(filteredAction, fu.Output(egressHop.Egress))
+	fa.Actions = filteredAction
 
-		// Child device flow
-		fa = &fu.FlowArgs{
-			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
-			MatchFields: []*ofp.OfpOxmOfbField{
-				fu.InPort(egressHop.Ingress),
-			},
-			Actions: []*ofp.OfpAction{
-				fu.Output(egressHop.Egress),
-			},
-		}
-		// Augment the matchfields with the ofpfields from the flow
-		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
-
-		fg = fu.NewFlowsAndGroups()
-		fg.AddFlow(fu.MkFlowStat(fa))
-		deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
-	} else {
-		var fa *fu.FlowArgs
-		fa = &fu.FlowArgs{
-			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
-			MatchFields: []*ofp.OfpOxmOfbField{
-				fu.InPort(egressHop.Ingress),
-			},
-		}
-		// Augment the matchfields with the ofpfields from the flow
-		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
-
-		// Augment the Actions
-		filteredAction := fu.GetActions(flow, fu.OUTPUT)
-		filteredAction = append(filteredAction, fu.Output(egressHop.Egress))
-		fa.Actions = filteredAction
-
-		fg := fu.NewFlowsAndGroups()
-		fg.AddFlow(fu.MkFlowStat(fa))
-		deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
-	}
+	fg := fu.NewFlowsAndGroups()
+	fg.AddFlow(fu.MkFlowStat(fa))
+	deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
 	return deviceRules
 }
 
@@ -559,14 +463,20 @@
 			return deviceRules
 		}
 		isUpstream := !ingressDevice.Root
-		if isUpstream {
+		if isUpstream { // Unicast OLT and ONU UL
+			log.Info("processOltAndOnuUpstreamNonControllerBoundUnicastFlows", log.Fields{"flows": flow})
 			deviceRules = fd.processUpstreamNonControllerBoundFlow(agent, route, inPortNo, outPortNo, flow)
-		} else if fu.HasNextTable(flow) {
+		} else if fu.HasNextTable(flow) && flow.TableId == 0 { // Unicast OLT flow DL
+			log.Debugw("processOltDownstreamNonControllerBoundFlowWithNextTable", log.Fields{"flows": flow})
 			deviceRules = fd.processDownstreamFlowWithNextTable(agent, route, inPortNo, outPortNo, flow)
-		} else if outPortNo != 0 { // Unicast
+		} else if flow.TableId == 1 && outPortNo != 0 { // Unicast ONU flow DL
+			log.Debugw("processOnuDownstreamUnicastFlow", log.Fields{"flows": flow})
 			deviceRules = fd.processUnicastFlow(agent, route, inPortNo, outPortNo, flow)
-		} else if grpId := fu.GetGroup(flow); grpId != 0 { //Multicast
+		} else if grpId := fu.GetGroup(flow); grpId != 0 && flow.TableId == 0 { //Multicast
+			log.Debugw("processMulticastFlow", log.Fields{"flows": flow})
 			deviceRules = fd.processMulticastFlow(agent, route, inPortNo, outPortNo, flow, grpId, groupMap)
+		} else {
+			log.Errorw("unknown-downstream-flow", log.Fields{"flow": *flow})
 		}
 	}
 	deviceRules = fd.updateOutputPortForControllerBoundFlowForParentDevide(flow, deviceRules)
diff --git a/rw_core/flow_decomposition/flow_decomposer_test.go b/rw_core/flow_decomposition/flow_decomposer_test.go
index 41a93e4..f4632cd 100644
--- a/rw_core/flow_decomposition/flow_decomposer_test.go
+++ b/rw_core/flow_decomposition/flow_decomposer_test.go
@@ -28,9 +28,18 @@
 )
 
 func init() {
-	log.AddPackage(log.JSON, log.WarnLevel, nil)
-	log.UpdateAllLoggers(log.Fields{"instanceId": "flow-decomposition"})
-	log.SetAllLogLevel(log.WarnLevel)
+	// Setup default logger - applies for packages that do not have specific logger set
+	if _, err := log.SetDefaultLogger(log.JSON, 0, log.Fields{"instanceId": 1}); err != nil {
+		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+	}
+
+	// Update all loggers (provisioned via init) with a common field
+	if err := log.UpdateAllLoggers(log.Fields{"instanceId": 1}); err != nil {
+		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+	}
+
+	// Update all loggers to log level specified as input parameter
+	log.SetAllLogLevel(0)
 }
 
 type testDeviceManager struct {
@@ -473,7 +482,6 @@
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
 			fu.InPort(1),
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
 			fu.EthType(0x888e),
 		},
 		Actions: []*ofp.OfpAction{
@@ -490,14 +498,13 @@
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
 	assert.Nil(t, onu1FlowAndGroup)
-	assert.Equal(t, 2, oltFlowAndGroup.Flows.Len())
+	assert.Equal(t, 1, oltFlowAndGroup.Flows.Len())
 	assert.Equal(t, 0, oltFlowAndGroup.Groups.Len())
 
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
 			fu.InPort(1),
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 1),
 			fu.TunnelId(uint64(1)),
 			fu.EthType(0x888e),
 		},
@@ -510,24 +517,6 @@
 	expectedOltFlow := fu.MkFlowStat(fa)
 	derivedFlow := oltFlowAndGroup.GetFlow(0)
 	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
-
-	fa = &fu.FlowArgs{
-		KV: fu.OfpFlowModArgs{"priority": 1000},
-		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(2),
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000),
-			fu.VlanPcp(0),
-			fu.Metadata_ofp(1),
-			fu.TunnelId(uint64(1)),
-		},
-		Actions: []*ofp.OfpAction{
-			fu.PopVlan(),
-			fu.Output(1),
-		},
-	}
-	expectedOltFlow = fu.MkFlowStat(fa)
-	derivedFlow = oltFlowAndGroup.GetFlow(1)
-	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
 }
 
 func TestDhcpReRouteRuleDecomposition(t *testing.T) {
@@ -537,7 +526,6 @@
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
 			fu.InPort(1),
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
 			fu.EthType(0x0800),
 			fu.Ipv4Dst(0xffffffff),
 			fu.IpProto(17),
@@ -557,14 +545,13 @@
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
 	assert.Nil(t, onu1FlowAndGroup)
-	assert.Equal(t, 2, oltFlowAndGroup.Flows.Len())
+	assert.Equal(t, 1, oltFlowAndGroup.Flows.Len())
 	assert.Equal(t, 0, oltFlowAndGroup.Groups.Len())
 
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
 			fu.InPort(1),
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 1),
 			fu.TunnelId(uint64(1)),
 			fu.EthType(0x0800),
 			fu.Ipv4Dst(0xffffffff),
@@ -581,31 +568,12 @@
 	expectedOltFlow := fu.MkFlowStat(fa)
 	derivedFlow := oltFlowAndGroup.GetFlow(0)
 	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
-
-	fa = &fu.FlowArgs{
-		KV: fu.OfpFlowModArgs{"priority": 1000},
-		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(2),
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000),
-			fu.VlanPcp(0),
-			fu.Metadata_ofp(1),
-			fu.TunnelId(uint64(1)),
-		},
-		Actions: []*ofp.OfpAction{
-			fu.PopVlan(),
-			fu.Output(1),
-		},
-	}
-	expectedOltFlow = fu.MkFlowStat(fa)
-	derivedFlow = oltFlowAndGroup.GetFlow(1)
-	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
 }
 
 func TestUnicastUpstreamRuleDecomposition(t *testing.T) {
-
 	var fa *fu.FlowArgs
 	fa = &fu.FlowArgs{
-		KV: fu.OfpFlowModArgs{"priority": 500, "table_id": 1},
+		KV: fu.OfpFlowModArgs{"priority": 5000, "table_id": 0},
 		MatchFields: []*ofp.OfpOxmOfbField{
 			fu.InPort(1),
 			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
@@ -618,7 +586,7 @@
 
 	var fa2 *fu.FlowArgs
 	fa2 = &fu.FlowArgs{
-		KV: fu.OfpFlowModArgs{"priority": 500},
+		KV: fu.OfpFlowModArgs{"priority": 500, "table_id": 1},
 		MatchFields: []*ofp.OfpOxmOfbField{
 			fu.InPort(1),
 			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101),
@@ -633,19 +601,29 @@
 	}
 
 	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fu.MkFlowStat(fa), fu.MkFlowStat(fa2)}}
+	flows.Items[0].Instructions = []*ofp.OfpInstruction{{
+		Type: uint32(ofp.OfpInstructionType_OFPIT_GOTO_TABLE),
+		Data: &ofp.OfpInstruction_GotoTable{
+			GotoTable: &ofp.OfpInstructionGotoTable{
+				TableId: 1,
+			},
+		}}}
+
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
 
 	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
+	assert.NotNil(t, onu1FlowAndGroup)
+	assert.NotNil(t, onu1FlowAndGroup.Flows)
 	assert.Equal(t, 1, onu1FlowAndGroup.Flows.Len())
 	assert.Equal(t, 0, onu1FlowAndGroup.Groups.Len())
 	assert.Equal(t, 1, oltFlowAndGroup.Flows.Len())
 	assert.Equal(t, 0, oltFlowAndGroup.Groups.Len())
 
 	fa = &fu.FlowArgs{
-		KV: fu.OfpFlowModArgs{"priority": 500},
+		KV: fu.OfpFlowModArgs{"priority": 5000},
 		MatchFields: []*ofp.OfpOxmOfbField{
 			fu.InPort(2),
 			fu.TunnelId(uint64(1)),
@@ -657,8 +635,24 @@
 			fu.Output(1),
 		},
 	}
-	expectedOnu1Flow := fu.MkFlowStat(fa)
+
 	derivedFlow := onu1FlowAndGroup.GetFlow(0)
+	// Form the expected flow
+	expectedOnu1Flow := fu.MkFlowStat(fa)
+	expectedOnu1Flow.Instructions = []*ofp.OfpInstruction{{
+		Type: uint32(ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS),
+		Data: &ofp.OfpInstruction_Actions{
+			Actions: &ofp.OfpInstructionActions{
+				Actions: []*ofp.OfpAction{{
+					Type: 0,
+					Action: &ofp.OfpAction_Output{
+						Output: &ofp.OfpActionOutput{
+							Port:   1,
+							MaxLen: 65509,
+						},
+					}}}}}}}
+
+	expectedOnu1Flow.Id = derivedFlow.Id //  Assign same flow ID as derived flowID to match completely
 	assert.Equal(t, expectedOnu1Flow.String(), derivedFlow.String())
 
 	fa = &fu.FlowArgs{
@@ -682,9 +676,10 @@
 }
 
 func TestUnicastDownstreamRuleDecomposition(t *testing.T) {
+	log.Debugf("Starting Test Unicast Downstream")
 	var fa1 *fu.FlowArgs
 	fa1 = &fu.FlowArgs{
-		KV: fu.OfpFlowModArgs{"priority": 500, "table_id": 1},
+		KV: fu.OfpFlowModArgs{"priority": 500, "table_id": 0},
 		MatchFields: []*ofp.OfpOxmOfbField{
 			fu.InPort(10),
 			fu.Metadata_ofp((1000 << 32) | 1),
@@ -697,7 +692,7 @@
 
 	var fa2 *fu.FlowArgs
 	fa2 = &fu.FlowArgs{
-		KV: fu.OfpFlowModArgs{"priority": 500},
+		KV: fu.OfpFlowModArgs{"priority": 500, "table_id": 1},
 		MatchFields: []*ofp.OfpOxmOfbField{
 			fu.InPort(10),
 			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101),
@@ -710,10 +705,19 @@
 	}
 
 	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fu.MkFlowStat(fa1), fu.MkFlowStat(fa2)}}
+	flows.Items[0].Instructions = []*ofp.OfpInstruction{{
+		Type: uint32(ofp.OfpInstructionType_OFPIT_GOTO_TABLE),
+		Data: &ofp.OfpInstruction_GotoTable{
+			GotoTable: &ofp.OfpInstructionGotoTable{
+				TableId: 1,
+			},
+		}}}
+
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
 
 	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
 	assert.Equal(t, 1, onu1FlowAndGroup.Flows.Len())
@@ -725,8 +729,8 @@
 		KV: fu.OfpFlowModArgs{"priority": 500},
 		MatchFields: []*ofp.OfpOxmOfbField{
 			fu.InPort(2),
-			fu.Metadata_ofp(1000),
-			fu.TunnelId(uint64(1)),
+			fu.TunnelId(uint64(10)),
+			fu.Metadata_ofp(4294967296001),
 			fu.VlanPcp(0),
 		},
 		Actions: []*ofp.OfpAction{
@@ -734,8 +738,22 @@
 			fu.Output(1),
 		},
 	}
-	expectedOltFlow := fu.MkFlowStat(fa1)
+
 	derivedFlow := oltFlowAndGroup.GetFlow(0)
+	expectedOltFlow := fu.MkFlowStat(fa1)
+	expectedOltFlow.Instructions = []*ofp.OfpInstruction{{
+		Type: uint32(ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS),
+		Data: &ofp.OfpInstruction_Actions{
+			Actions: &ofp.OfpInstructionActions{
+				Actions: []*ofp.OfpAction{{
+					Type: 0,
+					Action: &ofp.OfpAction_Output{
+						Output: &ofp.OfpActionOutput{
+							Port:   1,
+							MaxLen: 65509,
+						},
+					}}}}}}}
+	expectedOltFlow.Id = derivedFlow.Id
 	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
 
 	fa1 = &fu.FlowArgs{
diff --git a/rw_core/utils/flow_utils.go b/rw_core/utils/flow_utils.go
index 3828b39..4293126 100644
--- a/rw_core/utils/flow_utils.go
+++ b/rw_core/utils/flow_utils.go
@@ -29,7 +29,9 @@
 
 var (
 	// Instructions shortcut
-	APPLY_ACTIONS = ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS
+	APPLY_ACTIONS  = ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS
+	WRITE_METADATA = ofp.OfpInstructionType_OFPIT_WRITE_METADATA
+	METER_ACTION   = ofp.OfpInstructionType_OFPIT_METER
 
 	//OFPAT_* shortcuts
 	OUTPUT       = ofp.OfpActionType_OFPAT_OUTPUT
@@ -456,6 +458,22 @@
 	return 0
 }
 
+func GetMeterId(flow *ofp.OfpFlowStats) uint32 {
+	if flow == nil {
+		return 0
+	}
+	for _, instruction := range flow.Instructions {
+		if instruction.Type == uint32(ofp.OfpInstructionType_OFPIT_METER) {
+			MeterInstruction := instruction.GetMeter()
+			if MeterInstruction == nil {
+				return 0
+			}
+			return MeterInstruction.GetMeterId()
+		}
+	}
+	return 0
+}
+
 func GetTunnelId(flow *ofp.OfpFlowStats) uint64 {
 	if flow == nil {
 		return 0
@@ -475,9 +493,10 @@
 	}
 	for _, field := range GetOfbFields(flow) {
 		if field.Type == METADATA {
-			return uint32(field.GetTableMetadata() & 0xffffffff)
+			return uint32(field.GetTableMetadata() & 0xFFFFFFFF)
 		}
 	}
+	log.Debug("No-metadata-present")
 	return 0
 }
 
@@ -490,28 +509,83 @@
 			return field.GetTableMetadata()
 		}
 	}
+	log.Debug("No-metadata-present")
 	return 0
 }
 
-// GetPortNumberFromMetadata retrieves the port number from the Metadata_ofp. The port number (UNI on ONU) is in the
-// lower 32-bits of Metadata_ofp and the inner_tag is in the upper 32-bits. This is set in the ONOS OltPipeline as
-// a Metadata_ofp field
-func GetPortNumberFromMetadata(flow *ofp.OfpFlowStats) uint64 {
-	md := GetMetaData64Bit(flow)
-	if md == 0 {
-		return 0
+// function returns write metadata value from write_metadata action field
+func GetMetadataFromWriteMetadataAction(flow *ofp.OfpFlowStats) uint64 {
+	if flow != nil {
+		for _, instruction := range flow.Instructions {
+			if instruction.Type == uint32(WRITE_METADATA) {
+				if writeMetadata := instruction.GetWriteMetadata(); writeMetadata != nil {
+					return writeMetadata.GetMetadata()
+				}
+			}
+		}
 	}
-	if md <= 0xffffffff {
-		log.Debugw("onos-upgrade-suggested", log.Fields{"Metadata_ofp": md, "message": "Legacy MetaData detected form OltPipeline"})
-		return md
+	log.Debugw("No-write-metadata-present", log.Fields{"flow": flow})
+	return 0
+}
+
+func GetTechProfileIDFromWriteMetaData(metadata uint64) uint16 {
+	/*
+	   Write metadata instruction value (metadata) is 8 bytes:
+	   MS 2 bytes: C Tag
+	   Next 2 bytes: Technology Profile Id
+	   Next 4 bytes: Port number (uni or nni)
+
+	   This is set in the ONOS OltPipeline as a write metadata instruction
+	*/
+	var tpId uint16 = 0
+	log.Debugw("Write metadata value for Techprofile ID", log.Fields{"metadata": metadata})
+	if metadata != 0 {
+		tpId = uint16((metadata >> 32) & 0xFFFF)
+		log.Debugw("Found techprofile ID from write metadata action", log.Fields{"tpid": tpId})
 	}
-	return md & 0xffffffff
+	return tpId
+}
+
+func GetEgressPortNumberFromWriteMetadata(flow *ofp.OfpFlowStats) uint32 {
+	/*
+			  Write metadata instruction value (metadata) is 8 bytes:
+		    	MS 2 bytes: C Tag
+		    	Next 2 bytes: Technology Profile Id
+		    	Next 4 bytes: Port number (uni or nni)
+		    	This is set in the ONOS OltPipeline as a write metadata instruction
+	*/
+	var uniPort uint32 = 0
+	md := GetMetadataFromWriteMetadataAction(flow)
+	log.Debugw("Metadata found for egress/uni port ", log.Fields{"metadata": md})
+	if md != 0 {
+		uniPort = uint32(md & 0xFFFFFFFF)
+		log.Debugw("Found EgressPort from write metadata action", log.Fields{"egress_port": uniPort})
+	}
+	return uniPort
+
+}
+
+func GetInnerTagFromMetaData(flow *ofp.OfpFlowStats) uint16 {
+	/*
+			  Write metadata instruction value (metadata) is 8 bytes:
+		    	MS 2 bytes: C Tag
+		    	Next 2 bytes: Technology Profile Id
+		    	Next 4 bytes: Port number (uni or nni)
+		    	This is set in the ONOS OltPipeline as a write metadata instruction
+	*/
+	var innerTag uint16 = 0
+	md := GetMetadataFromWriteMetadataAction(flow)
+	if md != 0 {
+		innerTag = uint16((md >> 48) & 0xFFFF)
+		log.Debugw("Found  CVLAN from write metadate action", log.Fields{"c_vlan": innerTag})
+	}
+	return innerTag
 }
 
 //GetInnerTagFromMetaData retrieves the inner tag from the Metadata_ofp. The port number (UNI on ONU) is in the
 // lower 32-bits of Metadata_ofp and the inner_tag is in the upper 32-bits. This is set in the ONOS OltPipeline as
 //// a Metadata_ofp field
-func GetInnerTagFromMetaData(flow *ofp.OfpFlowStats) uint64 {
+/*func GetInnerTagFromMetaData(flow *ofp.OfpFlowStats) uint64 {
 	md := GetMetaData64Bit(flow)
 	if md == 0 {
 		return 0
@@ -521,7 +595,7 @@
 		return md
 	}
 	return (md >> 32) & 0xffffffff
-}
+}*/
 
 // Extract the child device port from a flow that contains the parent device peer port.  Typically the UNI port of an
 // ONU child device.  Per TST agreement this will be the lower 32 bits of tunnel id reserving upper 32 bits for later
@@ -571,6 +645,23 @@
 	return nil
 }
 
+// GetMeterIdFlowModArgs returns the meterId if the "meter_id" is present in the map, otherwise return 0
+func GetMeterIdFlowModArgs(kw OfpFlowModArgs) uint32 {
+	if val, exist := kw["meter_id"]; exist {
+		return uint32(val)
+	}
+	return 0
+}
+
+// Function returns the metadata if the "write_metadata" is present in the map, otherwise return nil
+func GetMetadataFlowModArgs(kw OfpFlowModArgs) uint64 {
+	if val, exist := kw["write_metadata"]; exist {
+		ret := uint64(val)
+		return ret
+	}
+	return 0
+}
+
 // Return unique 64-bit integer hash for flow covering the following attributes:
 // 'table_id', 'priority', 'flags', 'cookie', 'match', '_instruction_string'
 func HashFlowStats(flow *ofp.OfpFlowStats) uint64 {
@@ -619,6 +710,53 @@
 	return group
 }
 
+// flowStatsEntryFromFlowModMessage maps an ofp_flow_mod message to an ofp_flow_stats message
+func MeterEntryFromMeterMod(meterMod *ofp.OfpMeterMod) *ofp.OfpMeterEntry {
+	bandStats := make([]*ofp.OfpMeterBandStats, 0)
+	meter := &ofp.OfpMeterEntry{Config: &ofp.OfpMeterConfig{},
+		Stats: &ofp.OfpMeterStats{BandStats: bandStats}}
+	if meterMod == nil {
+		log.Error("Invalid meter mod command")
+		return meter
+	}
+	// config init
+	meter.Config.MeterId = meterMod.MeterId
+	meter.Config.Flags = meterMod.Flags
+	meter.Config.Bands = meterMod.Bands
+	// meter stats init
+	meter.Stats.MeterId = meterMod.MeterId
+	meter.Stats.FlowCount = 0
+	meter.Stats.PacketInCount = 0
+	meter.Stats.ByteInCount = 0
+	meter.Stats.DurationSec = 0
+	meter.Stats.DurationNsec = 0
+	// band stats init
+	for _, _ = range meterMod.Bands {
+		band := &ofp.OfpMeterBandStats{}
+		band.PacketBandCount = 0
+		band.ByteBandCount = 0
+		bandStats = append(bandStats, band)
+	}
+	meter.Stats.BandStats = bandStats
+	log.Debugw("Allocated meter entry", log.Fields{"meter": *meter})
+	return meter
+
+}
+
+func GetMeterIdFromFlow(flow *ofp.OfpFlowStats) uint32 {
+	if flow != nil {
+		for _, instruction := range flow.Instructions {
+			if instruction.Type == uint32(METER_ACTION) {
+				if meterInst := instruction.GetMeter(); meterInst != nil {
+					return meterInst.GetMeterId()
+				}
+			}
+		}
+	}
+
+	return uint32(0)
+}
+
 func MkOxmFields(matchFields []ofp.OfpOxmField) []*ofp.OfpOxmField {
 	oxmFields := make([]*ofp.OfpOxmField, 0)
 	for _, matchField := range matchFields {
@@ -653,6 +791,20 @@
 		inst := ofp.OfpInstruction{Type: uint32(ofp.OfpInstructionType_OFPIT_GOTO_TABLE), Data: &instGotoTable}
 		instructions = append(instructions, &inst)
 	}
+	// Process meter action
+	if meterId := GetMeterIdFlowModArgs(kw); meterId != 0 {
+		var instMeter ofp.OfpInstruction_Meter
+		instMeter.Meter = &ofp.OfpInstructionMeter{MeterId: meterId}
+		inst := ofp.OfpInstruction{Type: uint32(METER_ACTION), Data: &instMeter}
+		instructions = append(instructions, &inst)
+	}
+	//process write_metadata action
+	if metadata := GetMetadataFlowModArgs(kw); metadata != 0 {
+		var instWriteMetadata ofp.OfpInstruction_WriteMetadata
+		instWriteMetadata.WriteMetadata = &ofp.OfpInstructionWriteMetadata{Metadata: metadata}
+		inst := ofp.OfpInstruction{Type: uint32(WRITE_METADATA), Data: &instWriteMetadata}
+		instructions = append(instructions, &inst)
+	}
 
 	// Process match fields
 	oxmFields := make([]*ofp.OfpOxmField, 0)
@@ -745,7 +897,7 @@
 
 // MkFlowStat is a helper method to build flows
 func MkFlowStat(fa *FlowArgs) *ofp.OfpFlowStats {
-	//Build the matchfields
+	//Build the match-fields
 	matchFields := make([]*ofp.OfpOxmField, 0)
 	for _, val := range fa.MatchFields {
 		matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})