[VOL-3001] Separating flows, groups and meters from LogicalDevice.
- This is to improve the performance of flow addition to system.
- This patch does not include separation of the flows from Device. It will be implemented in another patch.
- Flows, groups and meters are kept in store by their unique ids per logical device, and cached into a map with these unique ids per logical device again.
  Accessing to this store and map is synchronized by a RWLock.
  Also a lock is kept in memory per flow, meter and group to synchronize the modifications (add/modify/delete requests) per flow/meter/group.
Change-Id: Ic0135faef0bbd1664693375fa6527e0242919e6d
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 940bf1c..901b27f 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -21,13 +21,14 @@
 	"encoding/hex"
 	"errors"
 	"fmt"
+	"reflect"
+	"sync"
+	"time"
+
 	"github.com/golang/protobuf/ptypes"
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
 	"github.com/opencord/voltha-go/rw_core/core/device/remote"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
-	"reflect"
-	"sync"
-	"time"
 
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/db/model"
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index b87b81e..f032a65 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -21,6 +21,7 @@
 	"encoding/hex"
 	"errors"
 	"fmt"
+	"strconv"
 	"sync"
 	"time"
 
@@ -57,6 +58,34 @@
 	requestQueue       *coreutils.RequestQueue
 	startOnce          sync.Once
 	stopOnce           sync.Once
+
+	meters    map[uint32]*MeterChunk
+	meterLock sync.RWMutex
+	flows     map[uint64]*FlowChunk
+	flowLock  sync.RWMutex
+	groups    map[uint32]*GroupChunk
+	groupLock sync.RWMutex
+}
+
+//MeterChunk keeps a meter entry and its lock. The lock in the struct is used to syncronize the
+//modifications for the related meter.
+type MeterChunk struct {
+	meter *ofp.OfpMeterEntry
+	lock  sync.Mutex
+}
+
+//FlowChunk keeps a flow and the lock for this flow. The lock in the struct is used to syncronize the
+//modifications for the related flow.
+type FlowChunk struct {
+	flow *ofp.OfpFlowStats
+	lock sync.Mutex
+}
+
+//GroupChunk keeps a group entry and its lock. The lock in the struct is used to syncronize the
+//modifications for the related group.
+type GroupChunk struct {
+	group *ofp.OfpGroupEntry
+	lock  sync.Mutex
 }
 
 func newLogicalDeviceAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
@@ -72,6 +101,9 @@
 	agent.logicalPortsNo = make(map[uint32]bool)
 	agent.defaultTimeout = timeout
 	agent.requestQueue = coreutils.NewRequestQueue()
+	agent.meters = make(map[uint32]*MeterChunk)
+	agent.flows = make(map[uint64]*FlowChunk)
+	agent.groups = make(map[uint32]*GroupChunk)
 	return &agent
 }
 
@@ -151,6 +183,10 @@
 
 		// Setup the local list of logical ports
 		agent.addLogicalPortsToMap(ld.Ports)
+		// load the flows, meters and groups from KV to cache
+		agent.loadFlows(ctx)
+		agent.loadMeters(ctx)
+		agent.loadGroups(ctx)
 	}
 
 	// Setup the device routes. Building routes may fail if the pre-conditions are not satisfied (e.g. no PON ports present)
@@ -205,43 +241,39 @@
 // ListLogicalDeviceFlows returns logical device flows
 func (agent *LogicalAgent) ListLogicalDeviceFlows(ctx context.Context) (*ofp.Flows, error) {
 	logger.Debug("ListLogicalDeviceFlows")
-
-	logicalDevice, err := agent.GetLogicalDevice(ctx)
-	if err != nil {
-		return nil, err
+	var flowStats []*ofp.OfpFlowStats
+	agent.flowLock.RLock()
+	defer agent.flowLock.RUnlock()
+	for _, flowChunk := range agent.flows {
+		flowStats = append(flowStats, (proto.Clone(flowChunk.flow)).(*ofp.OfpFlowStats))
 	}
-	if logicalDevice.Flows == nil {
-		return &ofp.Flows{}, nil
-	}
-	return (proto.Clone(logicalDevice.Flows)).(*ofp.Flows), nil
+	return &ofp.Flows{Items: flowStats}, nil
 }
 
 // ListLogicalDeviceMeters returns logical device meters
 func (agent *LogicalAgent) ListLogicalDeviceMeters(ctx context.Context) (*ofp.Meters, error) {
 	logger.Debug("ListLogicalDeviceMeters")
 
-	logicalDevice, err := agent.GetLogicalDevice(ctx)
-	if err != nil {
-		return nil, err
+	var meterEntries []*ofp.OfpMeterEntry
+	agent.meterLock.RLock()
+	defer agent.meterLock.RUnlock()
+	for _, meterChunk := range agent.meters {
+		meterEntries = append(meterEntries, (proto.Clone(meterChunk.meter)).(*ofp.OfpMeterEntry))
 	}
-	if logicalDevice.Meters == nil {
-		return &ofp.Meters{}, nil
-	}
-	return (proto.Clone(logicalDevice.Meters)).(*ofp.Meters), nil
+	return &ofp.Meters{Items: meterEntries}, nil
 }
 
 // ListLogicalDeviceFlowGroups returns logical device flow groups
 func (agent *LogicalAgent) ListLogicalDeviceFlowGroups(ctx context.Context) (*ofp.FlowGroups, error) {
 	logger.Debug("ListLogicalDeviceFlowGroups")
 
-	logicalDevice, err := agent.GetLogicalDevice(ctx)
-	if err != nil {
-		return nil, err
+	var groupEntries []*ofp.OfpGroupEntry
+	agent.groupLock.RLock()
+	defer agent.groupLock.RUnlock()
+	for _, value := range agent.groups {
+		groupEntries = append(groupEntries, (proto.Clone(value.group)).(*ofp.OfpGroupEntry))
 	}
-	if logicalDevice.FlowGroups == nil {
-		return &ofp.FlowGroups{}, nil
-	}
-	return (proto.Clone(logicalDevice.FlowGroups)).(*ofp.FlowGroups), nil
+	return &ofp.FlowGroups{Items: groupEntries}, nil
 }
 
 // ListLogicalDevicePorts returns logical device ports
@@ -259,45 +291,77 @@
 	return &voltha.LogicalPorts{Items: lPorts}, nil
 }
 
-//updateLogicalDeviceFlowsWithoutLock updates the logical device with the latest flows in the model.
-func (agent *LogicalAgent) updateLogicalDeviceFlowsWithoutLock(ctx context.Context, flows *ofp.Flows) error {
-	ld := agent.getLogicalDeviceWithoutLock()
-
-	logger.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
-	ld.Flows = flows
-
-	if err := agent.updateLogicalDeviceWithoutLock(ctx, ld); err != nil {
-		logger.Errorw("error-updating-logical-device-with-flows", log.Fields{"error": err})
-		return err
+//updateLogicalDeviceFlow updates flow in the store and cache
+//It is assumed that the chunk lock has been acquired before this function is called
+func (agent *LogicalAgent) updateLogicalDeviceFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowChunk *FlowChunk) error {
+	path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flow.Id)
+	if err := agent.clusterDataProxy.Update(ctx, path, flow); err != nil {
+		return status.Errorf(codes.Internal, "failed-update-flow:%s:%d %s", agent.logicalDeviceID, flow.Id, err)
 	}
+	flowChunk.flow = flow
 	return nil
 }
 
-//updateLogicalDeviceMetersWithoutLock updates the logical device with the meters info
-func (agent *LogicalAgent) updateLogicalDeviceMetersWithoutLock(ctx context.Context, meters *ofp.Meters) error {
-	ld := agent.getLogicalDeviceWithoutLock()
+//removeLogicalDeviceFlow deletes the flow from store and cache.
+//It is assumed that the chunk lock has been acquired before this function is called
+func (agent *LogicalAgent) removeLogicalDeviceFlow(ctx context.Context, flowID uint64) error {
+	path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
+	if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
+		return fmt.Errorf("couldnt-delete-flow-from-the-store-%s", path)
+	}
+	agent.flowLock.Lock()
+	defer agent.flowLock.Unlock()
+	delete(agent.flows, flowID)
+	return nil
+}
 
-	logger.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
-	ld.Meters = meters
-
-	if err := agent.updateLogicalDeviceWithoutLock(ctx, ld); err != nil {
+//updateLogicalDeviceMeter updates meter info in store and cache
+//It is assumed that the chunk lock has been acquired before this function is called
+func (agent *LogicalAgent) updateLogicalDeviceMeter(ctx context.Context, meter *ofp.OfpMeterEntry, meterChunk *MeterChunk) error {
+	path := fmt.Sprintf("meters/%s/%d", agent.logicalDeviceID, meter.Config.MeterId)
+	if err := agent.clusterDataProxy.Update(ctx, path, meter); err != nil {
 		logger.Errorw("error-updating-logical-device-with-meters", log.Fields{"error": err})
 		return err
 	}
+	meterChunk.meter = meter
 	return nil
 }
 
-//updateLogicalDeviceFlowGroupsWithoutLock updates the logical device with the flow groups
-func (agent *LogicalAgent) updateLogicalDeviceFlowGroupsWithoutLock(ctx context.Context, flowGroups *ofp.FlowGroups) error {
-	ld := agent.getLogicalDeviceWithoutLock()
+//removeLogicalDeviceMeter deletes the meter from store and cache
+//It is assumed that the chunk lock has been acquired before this function is called
+func (agent *LogicalAgent) removeLogicalDeviceMeter(ctx context.Context, meterID uint32) error {
+	path := fmt.Sprintf("meters/%s/%d", agent.logicalDeviceID, meterID)
+	if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
+		return fmt.Errorf("couldnt-delete-meter-from-store-%s", path)
+	}
+	agent.meterLock.Lock()
+	defer agent.meterLock.Unlock()
+	delete(agent.meters, meterID)
+	return nil
+}
 
-	logger.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
-	ld.FlowGroups = flowGroups
-
-	if err := agent.updateLogicalDeviceWithoutLock(ctx, ld); err != nil {
-		logger.Errorw("error-updating-logical-device-with-flowgroups", log.Fields{"error": err})
+//updateLogicalDeviceFlowGroup updates the flow groups in store and cache
+//It is assumed that the chunk lock has been acquired before this function is called
+func (agent *LogicalAgent) updateLogicalDeviceFlowGroup(ctx context.Context, groupEntry *ofp.OfpGroupEntry, groupChunk *GroupChunk) error {
+	path := fmt.Sprintf("groups/%s/%d", agent.logicalDeviceID, groupEntry.Desc.GroupId)
+	if err := agent.clusterDataProxy.Update(ctx, path, groupEntry); err != nil {
+		logger.Errorw("error-updating-logical-device-with-group", log.Fields{"error": err})
 		return err
 	}
+	groupChunk.group = groupEntry
+	return nil
+}
+
+//removeLogicalDeviceFlowGroup removes the flow groups in store and cache
+//It is assumed that the chunk lock has been acquired before this function is called
+func (agent *LogicalAgent) removeLogicalDeviceFlowGroup(ctx context.Context, groupID uint32) error {
+	path := fmt.Sprintf("groups/%s/%d", agent.logicalDeviceID, groupID)
+	if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
+		return fmt.Errorf("couldnt-delete-group-from-store-%s", path)
+	}
+	agent.groupLock.Lock()
+	defer agent.groupLock.Unlock()
+	delete(agent.groups, groupID)
 	return nil
 }
 
@@ -490,14 +554,6 @@
 	return proto.Clone(&voltha.LogicalPorts{Items: ports}).(*voltha.LogicalPorts).Items
 }
 
-func cloneFlows(flows []*ofp.OfpFlowStats) []*ofp.OfpFlowStats {
-	return proto.Clone(&ofp.Flows{Items: flows}).(*ofp.Flows).Items
-}
-
-func cloneMeters(meters []*ofp.OfpMeterEntry) []*ofp.OfpMeterEntry {
-	return proto.Clone(&ofp.Meters{Items: meters}).(*ofp.Meters).Items
-}
-
 //updateLogicalDevicePortsWithoutLock updates the
 func (agent *LogicalAgent) updateLogicalDevicePortsWithoutLock(ctx context.Context, device *voltha.LogicalDevice, newPorts []*voltha.LogicalPort) error {
 	oldPorts := device.Ports
@@ -512,7 +568,7 @@
 //updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
 func (agent *LogicalAgent) updateLogicalDeviceWithoutLock(ctx context.Context, logicalDevice *voltha.LogicalDevice) error {
 	if agent.stopped {
-		return errors.New("logical device agent stopped")
+		return fmt.Errorf("logical device agent stopped-%s", logicalDevice.Id)
 	}
 
 	updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
@@ -556,6 +612,7 @@
 	if flow == nil {
 		return nil
 	}
+
 	if err := agent.generateDeviceRoutesIfNeeded(ctx); err != nil {
 		return err
 	}
@@ -581,6 +638,7 @@
 	if groupMod == nil {
 		return nil
 	}
+
 	if err := agent.generateDeviceRoutesIfNeeded(ctx); err != nil {
 		return err
 	}
@@ -621,34 +679,36 @@
 	if meterMod == nil {
 		return nil
 	}
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
-	logger.Debug("Acquired logical device lock")
-	lDevice := agent.getLogicalDeviceWithoutLock()
-
-	var meters []*ofp.OfpMeterEntry
-	if lDevice.Meters != nil && lDevice.Meters.Items != nil {
-		meters = lDevice.Meters.Items
-	}
-	logger.Debugw("Available meters", log.Fields{"meters": meters})
-
-	for _, meter := range meters {
-		if meterMod.MeterId == meter.Config.MeterId {
-			logger.Infow("Meter-already-exists", log.Fields{"meter": *meterMod})
-			return nil
-		}
-	}
 
 	meterEntry := fu.MeterEntryFromMeterMod(meterMod)
-	meters = append(meters, meterEntry)
-	//Update model
-	if err := agent.updateLogicalDeviceMetersWithoutLock(ctx, &ofp.Meters{Items: meters}); err != nil {
-		logger.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+	agent.meterLock.Lock()
+	//check if the meter already exists or not
+	_, ok := agent.meters[meterMod.MeterId]
+	if ok {
+		logger.Infow("Meter-already-exists", log.Fields{"meter": *meterMod})
+		agent.meterLock.Unlock()
+		return nil
+	}
+
+	mChunk := MeterChunk{
+		meter: meterEntry,
+	}
+	//Add to map and acquire the per meter lock
+	agent.meters[meterMod.MeterId] = &mChunk
+	mChunk.lock.Lock()
+	defer mChunk.lock.Unlock()
+	agent.meterLock.Unlock()
+	meterID := strconv.Itoa(int(meterMod.MeterId))
+	if err := agent.clusterDataProxy.AddWithID(ctx, "meters/"+agent.logicalDeviceID, meterID, meterEntry); err != nil {
+		logger.Errorw("failed-adding-meter", log.Fields{"deviceID": agent.logicalDeviceID, "meterID": meterID, "err": err})
+		//Revert the map
+		agent.meterLock.Lock()
+		delete(agent.meters, meterMod.MeterId)
+		agent.meterLock.Unlock()
 		return err
 	}
-	logger.Debugw("Meter-added-successfully", log.Fields{"Added-meter": meterEntry, "updated-meters": lDevice.Meters})
+
+	logger.Debugw("Meter-added-successfully", log.Fields{"Added-meter": meterEntry})
 	return nil
 }
 
@@ -657,56 +717,25 @@
 	if meterMod == nil {
 		return nil
 	}
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
-
-	lDevice := agent.getLogicalDeviceWithoutLock()
-
-	var meters []*ofp.OfpMeterEntry
-	var flows []*ofp.OfpFlowStats
-	updatedFlows := make([]*ofp.OfpFlowStats, 0)
-	if lDevice.Meters != nil && lDevice.Meters.Items != nil {
-		meters = lDevice.Meters.Items
-	}
-
-	changedMeter := false
-	changedFow := false
-	logger.Debugw("Available meters", log.Fields{"meters": meters})
-	for index, meter := range meters {
-		if meterMod.MeterId == meter.Config.MeterId {
-			flows = lDevice.Flows.Items
-			changedFow, updatedFlows = agent.getUpdatedFlowsAfterDeletebyMeterID(flows, meterMod.MeterId)
-			meters = append(meters[:index], meters[index+1:]...)
-			logger.Debugw("Meter has been deleted", log.Fields{"meter": meter, "index": index})
-			changedMeter = true
-			break
-		}
-	}
-	if changedMeter {
-		//Update model
-		metersToUpdate := &ofp.Meters{}
-		if lDevice.Meters != nil {
-			metersToUpdate = &ofp.Meters{Items: meters}
-		}
-		if err := agent.updateLogicalDeviceMetersWithoutLock(ctx, metersToUpdate); err != nil {
-			logger.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+	agent.meterLock.RLock()
+	meterChunk, ok := agent.meters[meterMod.MeterId]
+	agent.meterLock.RUnlock()
+	if ok {
+		//Dont let anyone to do any changes to this meter until this is done.
+		//And wait if someone else is already making modifications. Do this with per meter lock.
+		meterChunk.lock.Lock()
+		defer meterChunk.lock.Unlock()
+		if err := agent.deleteFlowsOfMeter(ctx, meterMod.MeterId); err != nil {
 			return err
 		}
-		logger.Debug("Meter-deleted-from-DB-successfully", log.Fields{"updatedMeters": metersToUpdate, "no-of-meter": len(metersToUpdate.Items)})
-
-	}
-	if changedFow {
-		//Update model
-		if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: updatedFlows}); err != nil {
-			logger.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+		//remove from the store and cache
+		if err := agent.removeLogicalDeviceMeter(ctx, meterMod.MeterId); err != nil {
 			return err
 		}
-		logger.Debug("Flows-associated-with-meter-deleted-from-DB-successfully",
-			log.Fields{"updated-no-of-flows": len(updatedFlows), "meter": meterMod.MeterId})
+		logger.Debugw("meterDelete-success", log.Fields{"meterID": meterMod.MeterId})
+	} else {
+		logger.Warnw("meter-not-found", log.Fields{"meterID": meterMod.MeterId})
 	}
-	logger.Debugw("meterDelete success", log.Fields{"meterID": meterMod.MeterId})
 	return nil
 }
 
@@ -715,97 +744,95 @@
 	if meterMod == nil {
 		return nil
 	}
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+	newMeter := fu.MeterEntryFromMeterMod(meterMod)
+	agent.meterLock.RLock()
+	meterChunk, ok := agent.meters[newMeter.Config.MeterId]
+	agent.meterLock.RUnlock()
+	if !ok {
+		return fmt.Errorf("no-meter-to-modify:%d", newMeter.Config.MeterId)
+	}
+	//Release the map lock and syncronize per meter
+	meterChunk.lock.Lock()
+	defer meterChunk.lock.Unlock()
+	oldMeter := meterChunk.meter
+	newMeter.Stats.FlowCount = oldMeter.Stats.FlowCount
+
+	if err := agent.updateLogicalDeviceMeter(ctx, newMeter, meterChunk); err != nil {
+		logger.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "meterID": newMeter.Config.MeterId})
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
-
-	lDevice := agent.getLogicalDeviceWithoutLock()
-
-	var meters []*ofp.OfpMeterEntry
-	if lDevice.Meters != nil && lDevice.Meters.Items != nil {
-		meters = lDevice.Meters.Items
-	}
-	changedMeter := false
-	for index, meter := range meters {
-		if meterMod.MeterId == meter.Config.MeterId {
-			newmeterEntry := fu.MeterEntryFromMeterMod(meterMod)
-			newmeterEntry.Stats.FlowCount = meter.Stats.FlowCount
-			meters[index] = newmeterEntry
-			changedMeter = true
-			logger.Debugw("Found meter, replaced with new meter", log.Fields{"old meter": meter, "new meter": newmeterEntry})
-			break
-		}
-	}
-	if changedMeter {
-		//Update model
-		metersToUpdate := &ofp.Meters{}
-		if lDevice.Meters != nil {
-			metersToUpdate = &ofp.Meters{Items: meters}
-		}
-		if err := agent.updateLogicalDeviceMetersWithoutLock(ctx, metersToUpdate); err != nil {
-			logger.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-			return err
-		}
-		logger.Debugw("meter-updated-in-DB-successfully", log.Fields{"updated_meters": meters})
-		return nil
-	}
-
-	logger.Errorw("Meter not found ", log.Fields{"meter": meterMod})
-	return fmt.Errorf("no-logical-device-present:%d", meterMod.MeterId)
+	logger.Debugw("replaced-with-new-meter", log.Fields{"oldMeter": oldMeter, "newMeter": newMeter})
+	return nil
 
 }
 
-func (agent *LogicalAgent) getUpdatedFlowsAfterDeletebyMeterID(flows []*ofp.OfpFlowStats, meterID uint32) (bool, []*ofp.OfpFlowStats) {
-	logger.Infow("Delete flows matching meter", log.Fields{"meter": meterID})
-	changed := false
-	//updatedFlows := make([]*ofp.OfpFlowStats, 0)
-	for index := len(flows) - 1; index >= 0; index-- {
-		if mID := fu.GetMeterIdFromFlow(flows[index]); mID != 0 && mID == meterID {
-			logger.Debugw("Flow to be deleted", log.Fields{"flow": flows[index], "index": index})
-			flows = append(flows[:index], flows[index+1:]...)
-			changed = true
+func (agent *LogicalAgent) deleteFlowsOfMeter(ctx context.Context, meterID uint32) error {
+	logger.Infow("Delete-flows-matching-meter", log.Fields{"meter": meterID})
+	agent.flowLock.Lock()
+	defer agent.flowLock.Unlock()
+	for flowID, flowChunk := range agent.flows {
+		if mID := fu.GetMeterIdFromFlow(flowChunk.flow); mID != 0 && mID == meterID {
+			logger.Debugw("Flow-to-be- deleted", log.Fields{"flow": flowChunk.flow})
+			path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
+			if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
+				//TODO: Think on carrying on and deleting the remaining flows, instead of returning.
+				//Anyways this returns an error to controller which possibly results with a re-deletion.
+				//Then how can we handle the new deletion request(Same for group deletion)?
+				return fmt.Errorf("couldnt-deleted-flow-from-store-%s", path)
+			}
+			delete(agent.flows, flowID)
 		}
 	}
-	return changed, flows
+	return nil
 }
 
-func (agent *LogicalAgent) updateFlowCountOfMeterStats(modCommand *ofp.OfpFlowMod, meters []*ofp.OfpMeterEntry, flow *ofp.OfpFlowStats, revertUpdate bool) bool {
+func (agent *LogicalAgent) updateFlowCountOfMeterStats(ctx context.Context, modCommand *ofp.OfpFlowMod, flow *ofp.OfpFlowStats, revertUpdate bool) bool {
 
 	flowCommand := modCommand.GetCommand()
 	meterID := fu.GetMeterIdFromFlow(flow)
 	logger.Debugw("Meter-id-in-flow-mod", log.Fields{"meterId": meterID})
 	if meterID == 0 {
-		logger.Debugw("No meter present in the flow", log.Fields{"flow": *flow})
-		return false
+		logger.Debugw("No-meter-present-in-the-flow", log.Fields{"flow": *flow})
+		return true
 	}
-	if meters == nil {
-		logger.Debug("No meters present in logical device")
-		return false
+
+	if flowCommand != ofp.OfpFlowModCommand_OFPFC_ADD && flowCommand != ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT {
+		return true
 	}
-	changedMeter := false
-	for _, meter := range meters {
-		if meterID == meter.Config.MeterId { // Found meter in Logicaldevice
-			if flowCommand == ofp.OfpFlowModCommand_OFPFC_ADD {
-				if revertUpdate {
-					meter.Stats.FlowCount--
-				} else {
-					meter.Stats.FlowCount++
-				}
-				changedMeter = true
-			} else if flowCommand == ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT {
-				if revertUpdate {
-					meter.Stats.FlowCount++
-				} else {
-					meter.Stats.FlowCount--
-				}
-				changedMeter = true
-			}
-			logger.Debugw("Found meter, updated meter flow stats", log.Fields{" meterId": meterID})
-			break
+	agent.meterLock.RLock()
+	meterChunk, ok := agent.meters[meterID]
+	agent.meterLock.RUnlock()
+	if !ok {
+		logger.Debugw("Meter-is-not-present-in-logical-device", log.Fields{"meterID": meterID})
+		return true
+	}
+
+	//acquire the meter lock
+	meterChunk.lock.Lock()
+	defer meterChunk.lock.Unlock()
+
+	if flowCommand == ofp.OfpFlowModCommand_OFPFC_ADD {
+		if revertUpdate {
+			meterChunk.meter.Stats.FlowCount--
+		} else {
+			meterChunk.meter.Stats.FlowCount++
+		}
+	} else if flowCommand == ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT {
+		if revertUpdate {
+			meterChunk.meter.Stats.FlowCount++
+		} else {
+			meterChunk.meter.Stats.FlowCount--
 		}
 	}
-	return changedMeter
+
+	//	Update store and cache
+	if err := agent.updateLogicalDeviceMeter(ctx, meterChunk.meter, meterChunk); err != nil {
+		logger.Debugw("unable-to-update-meter-in-db", log.Fields{"logicalDevice": agent.logicalDeviceID, "meterID": meterID})
+		return false
+	}
+
+	logger.Debugw("updated-meter-flow-stats", log.Fields{"meterId": meterID})
+	return true
 }
 
 //flowAdd adds a flow to the flow table of that logical device
@@ -814,29 +841,65 @@
 	if mod == nil {
 		return nil
 	}
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+	flow, err := fu.FlowStatsEntryFromFlowModMessage(mod)
+	if err != nil {
+		logger.Errorw("flowAdd-failed", log.Fields{"flowMod": mod, "err": err})
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
-
-	lDevice := agent.getLogicalDeviceWithoutLock()
-
-	var flows []*ofp.OfpFlowStats
-	var meters []*ofp.OfpMeterEntry
-	var flowToReplace *ofp.OfpFlowStats
-	var flow *ofp.OfpFlowStats
-	var err error
-
-	if lDevice.Flows != nil && lDevice.Flows.Items != nil {
-		flows = lDevice.Flows.Items
+	var updated bool
+	var changed bool
+	if changed, updated, err = agent.decomposeAndAdd(ctx, flow, mod); err != nil {
+		logger.Errorw("flow-decompose-and-add-failed ", log.Fields{"flowMod": mod, "err": err})
+		return err
 	}
-
-	if lDevice.Meters != nil && lDevice.Meters.Items != nil {
-		meters = lDevice.Meters.Items
+	if changed && !updated {
+		if dbupdated := agent.updateFlowCountOfMeterStats(ctx, mod, flow, false); !dbupdated {
+			return fmt.Errorf("couldnt-updated-flow-stats-%s", strconv.FormatUint(flow.Id, 10))
+		}
 	}
-	updatedFlows := make([]*ofp.OfpFlowStats, 0)
+	return nil
+
+}
+
+func (agent *LogicalAgent) decomposeAndAdd(ctx context.Context, flow *ofp.OfpFlowStats, mod *ofp.OfpFlowMod) (bool, bool, error) {
 	changed := false
 	updated := false
+	alreadyExist := true
+	var flowToReplace *ofp.OfpFlowStats
+
+	//if flow is not found in the map, create a new entry, otherwise get the existing one.
+	agent.flowLock.Lock()
+	flowChunk, ok := agent.flows[flow.Id]
+	if !ok {
+		flowChunk = &FlowChunk{
+			flow: flow,
+		}
+		agent.flows[flow.Id] = flowChunk
+		alreadyExist = false
+		flowChunk.lock.Lock() //acquire chunk lock before releasing map lock
+		defer flowChunk.lock.Unlock()
+		agent.flowLock.Unlock()
+	} else {
+		agent.flowLock.Unlock() //release map lock before acquiring chunk lock
+		flowChunk.lock.Lock()
+		defer flowChunk.lock.Unlock()
+	}
+
+	if !alreadyExist {
+		flowID := strconv.FormatUint(flow.Id, 10)
+		if err := agent.clusterDataProxy.AddWithID(ctx, "logical_flows/"+agent.logicalDeviceID, flowID, flow); err != nil {
+			logger.Errorw("failed-adding-flow-to-db", log.Fields{"deviceID": agent.logicalDeviceID, "flowID": flowID, "err": err})
+			//Revert the map
+			//TODO: Solve the condition:If we have two flow Adds of the same flow (at least same priority and match) in quick succession
+			//then if the first one fails while the second one was waiting on the flowchunk, we will end up with an instance of flowChunk that is no longer in the map.
+			agent.flowLock.Lock()
+			delete(agent.flows, flow.Id)
+			agent.flowLock.Unlock()
+			return changed, updated, err
+		}
+	}
+	flows := make([]*ofp.OfpFlowStats, 0)
+	updatedFlows := make([]*ofp.OfpFlowStats, 0)
 	checkOverlap := (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_CHECK_OVERLAP)) != 0
 	if checkOverlap {
 		if overlapped := fu.FindOverlappingFlows(flows, mod); len(overlapped) != 0 {
@@ -844,77 +907,46 @@
 			logger.Warnw("overlapped-flows", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
 		} else {
 			//	Add flow
-			flow, err = fu.FlowStatsEntryFromFlowModMessage(mod)
-			if err != nil {
-				return err
-			}
-			flows = append(flows, flow)
-			updatedFlows = append(updatedFlows, flow)
 			changed = true
 		}
 	} else {
-		flow, err = fu.FlowStatsEntryFromFlowModMessage(mod)
-		if err != nil {
-			return err
-		}
-		idx := fu.FindFlows(flows, flow)
-		if idx >= 0 {
-			flowToReplace = flows[idx]
+		if alreadyExist {
+			flowToReplace = flowChunk.flow
 			if (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_RESET_COUNTS)) != 0 {
 				flow.ByteCount = flowToReplace.ByteCount
 				flow.PacketCount = flowToReplace.PacketCount
 			}
 			if !proto.Equal(flowToReplace, flow) {
-				flows[idx] = flow
-				updatedFlows = append(updatedFlows, flow)
 				changed = true
 				updated = true
 			}
 		} else {
-			flows = append(flows, flow)
-			updatedFlows = append(updatedFlows, flow)
 			changed = true
 		}
 	}
-	logger.Debugw("flowAdd-changed", log.Fields{"changed": changed})
-
+	logger.Debugw("flowAdd-changed", log.Fields{"changed": changed, "updated": updated})
 	if changed {
+		updatedFlows = append(updatedFlows, flow)
 		var flowMetadata voltha.FlowMetadata
-		if err := agent.GetMeterConfig(updatedFlows, meters, &flowMetadata); err != nil { // This should never happen,meters should be installed before flow arrives
-			logger.Error("Meter-referred-in-flows-not-present")
-			return err
+		lMeters, _ := agent.ListLogicalDeviceMeters(ctx)
+		if err := agent.GetMeterConfig(updatedFlows, lMeters.Items, &flowMetadata); err != nil {
+			logger.Error("Meter-referred-in-flow-not-present")
+			return changed, updated, err
 		}
-		deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: updatedFlows}, *lDevice.FlowGroups)
+		flowGroups, _ := agent.ListLogicalDeviceFlowGroups(ctx)
+		deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: updatedFlows}, *flowGroups)
 		if err != nil {
-			return err
+			return changed, updated, err
 		}
+
 		logger.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-
-		//	Update model
-		if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
-			logger.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-			return err
-		}
-
-		if !updated {
-			changedMeterStats := agent.updateFlowCountOfMeterStats(mod, meters, flow, false)
-			metersToUpdate := &ofp.Meters{}
-			if lDevice.Meters != nil {
-				metersToUpdate = &ofp.Meters{Items: meters}
-			}
-			if changedMeterStats {
-				//Update model
-				if err := agent.updateLogicalDeviceMetersWithoutLock(ctx, metersToUpdate); err != nil {
-					logger.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
-					return err
-				}
-				logger.Debugw("meter-stats-updated-in-DB-successfully", log.Fields{"updated_meters": meters})
-
+		//	Update store and cache
+		if updated {
+			if err := agent.updateLogicalDeviceFlow(ctx, flow, flowChunk); err != nil {
+				return changed, updated, err
 			}
 		}
-		// Send the flows to the devices
 		respChannels := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &flowMetadata)
-
 		// Create the go routines to wait
 		go func() {
 			// Wait for completion
@@ -927,48 +959,38 @@
 			}
 		}()
 	}
-	return nil
+	return changed, updated, nil
 }
 
 // revertAddedFlows reverts flows after the flowAdd request has failed.  All flows corresponding to that flowAdd request
 // will be reverted, both from the logical devices and the devices.
 func (agent *LogicalAgent) revertAddedFlows(ctx context.Context, mod *ofp.OfpFlowMod, addedFlow *ofp.OfpFlowStats, replacedFlow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules, metadata *voltha.FlowMetadata) error {
 	logger.Debugw("revertFlowAdd", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules, "metadata": metadata})
-	if err := agent.requestQueue.WaitForGreenLight(context.Background()); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
 
-	lDevice := agent.getLogicalDeviceWithoutLock()
-
-	// Revert flows
-	clonedFlows := cloneFlows(lDevice.Flows.Items)
-	idx := fu.FindFlows(clonedFlows, addedFlow)
-	if idx < 0 {
+	agent.flowLock.RLock()
+	flowChunk, ok := agent.flows[addedFlow.Id]
+	agent.flowLock.RUnlock()
+	if !ok {
 		// Not found - do nothing
 		log.Debugw("flow-not-found", log.Fields{"added-flow": addedFlow})
 		return nil
 	}
-	if replacedFlow != nil {
-		clonedFlows[idx] = replacedFlow
-	} else {
-		clonedFlows = deleteFlowWithoutPreservingOrder(clonedFlows, idx)
-	}
-	lDevice.Flows = &ofp.Flows{Items: clonedFlows}
+	//Leave the map lock and syncronize per flow
+	flowChunk.lock.Lock()
+	defer flowChunk.lock.Unlock()
 
-	// Revert meters, if necessary
-	if lDevice.Meters != nil && len(lDevice.Meters.Items) > 0 {
-		meters := cloneMeters(lDevice.Meters.Items)
-		changedMeterStats := agent.updateFlowCountOfMeterStats(mod, meters, addedFlow, true)
-		if changedMeterStats {
-			lDevice.Meters = &ofp.Meters{Items: meters}
+	if replacedFlow != nil {
+		if err := agent.updateLogicalDeviceFlow(ctx, replacedFlow, flowChunk); err != nil {
+			return err
+		}
+	} else {
+		if err := agent.removeLogicalDeviceFlow(ctx, addedFlow.Id); err != nil {
+			return err
 		}
 	}
-
-	// Update the model
-	if err := agent.updateLogicalDeviceWithoutLock(ctx, lDevice); err != nil {
-		logger.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
-		return err
+	// Revert meters
+	if changedMeterStats := agent.updateFlowCountOfMeterStats(ctx, mod, addedFlow, true); !changedMeterStats {
+		return fmt.Errorf("Unable-to-revert-meterstats-for-flow-%s", strconv.FormatUint(addedFlow.Id, 10))
 	}
 
 	// Update the devices
@@ -1005,7 +1027,7 @@
 			if !foundMeter {
 				logger.Errorw("Meter-referred-by-flow-is-not-found-in-logicaldevice",
 					log.Fields{"meterID": flowMeterID, "Available-meters": meters, "flow": *flow})
-				return errors.New("Meter-referred-by-flow-is-not-found-in-logicaldevice")
+				return fmt.Errorf("Meter-referred-by-flow-is-not-found-in-logicaldevice.MeterId-%d", flowMeterID)
 			}
 		}
 	}
@@ -1020,63 +1042,57 @@
 	if mod == nil {
 		return nil
 	}
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+
+	fs, err := fu.FlowStatsEntryFromFlowModMessage(mod)
+	if err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
 
-	lDevice := agent.getLogicalDeviceWithoutLock()
-
-	var meters []*ofp.OfpMeterEntry
-	var flows []*ofp.OfpFlowStats
-	var flowGroups []*ofp.OfpGroupEntry
-
-	if lDevice.Flows != nil && lDevice.Flows.Items != nil {
-		flows = lDevice.Flows.Items
-	}
-
-	if lDevice.Meters != nil && lDevice.Meters.Items != nil {
-		meters = lDevice.Meters.Items
-	}
-
-	if lDevice.FlowGroups != nil && lDevice.FlowGroups.Items != nil {
-		flowGroups = lDevice.FlowGroups.Items
-	}
-
-	//build a list of what to keep vs what to delete
-	toKeep := make([]*ofp.OfpFlowStats, 0)
+	//build a list of what to delete
 	toDelete := make([]*ofp.OfpFlowStats, 0)
-	for _, f := range flows {
-		// Check whether the flow and the flowmod matches
-		fs, err := fu.FlowStatsEntryFromFlowModMessage(mod)
-		if err != nil {
-			return err
-		}
-		if fu.FlowMatch(f, fs) {
-			toDelete = append(toDelete, f)
+	toDeleteChunks := make([]*FlowChunk, 0)
+	//Lock the map to search the matched flows
+	agent.flowLock.RLock()
+	for _, f := range agent.flows {
+		if fu.FlowMatch(f.flow, fs) {
+			toDelete = append(toDelete, f.flow)
+			toDeleteChunks = append(toDeleteChunks, f)
 			continue
 		}
 		// Check wild card match
-		if !fu.FlowMatchesMod(f, mod) {
-			toKeep = append(toKeep, f)
-		} else {
-			toDelete = append(toDelete, f)
+		if fu.FlowMatchesMod(f.flow, mod) {
+			toDelete = append(toDelete, f.flow)
+			toDeleteChunks = append(toDeleteChunks, f)
 		}
 	}
-
-	logger.Debugw("flowDelete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "toKeep": len(toKeep), "toDelete": toDelete})
-
-	//Update flows
+	agent.flowLock.RUnlock()
+	//Delete the matched flows
 	if len(toDelete) > 0 {
+		logger.Debugw("flowDelete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "toDelete": len(toDelete)})
+		var meters []*ofp.OfpMeterEntry
+		var flowGroups []*ofp.OfpGroupEntry
+		if ofpMeters, err := agent.ListLogicalDeviceMeters(ctx); err != nil {
+			meters = ofpMeters.Items
+		}
+
+		if groups, err := agent.ListLogicalDeviceFlowGroups(ctx); err != nil {
+			flowGroups = groups.Items
+		}
+
+		for _, fc := range toDeleteChunks {
+			if err := agent.deleteFlowAndUpdateMeterStats(ctx, mod, fc); err != nil {
+				return err
+			}
+		}
 		var flowMetadata voltha.FlowMetadata
 		if err := agent.GetMeterConfig(toDelete, meters, &flowMetadata); err != nil { // This should never happen
 			logger.Error("Meter-referred-in-flows-not-present")
-			return errors.New("Meter-referred-in-flows-not-present")
+			return err
 		}
-
 		var respChnls []coreutils.Response
 		var partialRoute bool
-		deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: toDelete}, ofp.FlowGroups{Items: flowGroups})
+		var deviceRules *fu.DeviceRules
+		deviceRules, err = agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: toDelete}, ofp.FlowGroups{Items: flowGroups})
 		if err != nil {
 			// A no route error means no route exists between the ports specified in the flow. This can happen when the
 			// child device is deleted and a request to delete flows from the parent device is received
@@ -1087,12 +1103,6 @@
 			partialRoute = true
 		}
 
-		// Update the dB
-		if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: toKeep}); err != nil {
-			logger.Errorw("cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-			return err
-		}
-
 		// Update the devices
 		if partialRoute {
 			respChnls = agent.deleteFlowsFromParentDevice(ctx, ofp.Flows{Items: toDelete}, &flowMetadata)
@@ -1109,11 +1119,23 @@
 			}
 		}()
 	}
-
 	//TODO: send announcement on delete
 	return nil
 }
 
+func (agent *LogicalAgent) deleteFlowAndUpdateMeterStats(ctx context.Context, mod *ofp.OfpFlowMod, chunk *FlowChunk) error {
+	chunk.lock.Lock()
+	defer chunk.lock.Unlock()
+	if changedMeter := agent.updateFlowCountOfMeterStats(ctx, mod, chunk.flow, false); !changedMeter {
+		return fmt.Errorf("Cannot-delete-flow-%s. Meter-update-failed", chunk.flow)
+	}
+	// Update store and cache
+	if err := agent.removeLogicalDeviceFlow(ctx, chunk.flow.Id); err != nil {
+		return fmt.Errorf("Cannot-delete-flows-%s. Delete-from-store-failed", chunk.flow)
+	}
+	return nil
+}
+
 func (agent *LogicalAgent) addFlowsAndGroupsToDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
 	logger.Debugw("send-add-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
 
@@ -1220,98 +1242,77 @@
 
 //flowDeleteStrict deletes a flow from the flow table of that logical device
 func (agent *LogicalAgent) flowDeleteStrict(ctx context.Context, mod *ofp.OfpFlowMod) error {
-	logger.Debug("flowDeleteStrict")
+	logger.Debugw("flowDeleteStrict", log.Fields{"mod": mod})
 	if mod == nil {
 		return nil
 	}
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
 
-	lDevice := agent.getLogicalDeviceWithoutLock()
-
-	var meters []*ofp.OfpMeterEntry
-	var flows []*ofp.OfpFlowStats
-	var flowGroups []*ofp.OfpGroupEntry
-	if lDevice.Meters != nil && lDevice.Meters.Items != nil {
-		meters = lDevice.Meters.Items
-	}
-	if lDevice.Flows != nil && lDevice.Flows.Items != nil {
-		flows = lDevice.Flows.Items
-	}
-	if lDevice.FlowGroups != nil && lDevice.FlowGroups.Items != nil {
-		flowGroups = lDevice.FlowGroups.Items
-	}
-
-	changedFlow := false
-	changedMeter := false
 	flow, err := fu.FlowStatsEntryFromFlowModMessage(mod)
 	if err != nil {
 		return err
 	}
-	flowsToDelete := make([]*ofp.OfpFlowStats, 0)
-	idx := fu.FindFlows(flows, flow)
-	if idx >= 0 {
-		changedMeter = agent.updateFlowCountOfMeterStats(mod, meters, flows[idx], false)
-		flowsToDelete = append(flowsToDelete, flows[idx])
-		flows = append(flows[:idx], flows[idx+1:]...)
-		changedFlow = true
+	logger.Debugw("flow-id-in-flow-delete-strict", log.Fields{"flowID": flow.Id})
+	agent.flowLock.RLock()
+	flowChunk, ok := agent.flows[flow.Id]
+	agent.flowLock.RUnlock()
+	if !ok {
+		logger.Debugw("Skipping-flow-delete-strict-request. No-flow-found", log.Fields{"flowMod": mod})
+		return nil
+	}
+	//Release the map lock and syncronize per flow
+	flowChunk.lock.Lock()
+	defer flowChunk.lock.Unlock()
+
+	var meters []*ofp.OfpMeterEntry
+	var flowGroups []*ofp.OfpGroupEntry
+	if ofMeters, er := agent.ListLogicalDeviceMeters(ctx); er == nil {
+		meters = ofMeters.Items
+	}
+	if ofGroups, er := agent.ListLogicalDeviceFlowGroups(ctx); er == nil {
+		flowGroups = ofGroups.Items
+	}
+	if changedMeter := agent.updateFlowCountOfMeterStats(ctx, mod, flow, false); !changedMeter {
+		return fmt.Errorf("Cannot delete flow - %s. Meter update failed", flow)
+	}
+
+	var flowMetadata voltha.FlowMetadata
+	flowsToDelete := []*ofp.OfpFlowStats{flowChunk.flow}
+	if err := agent.GetMeterConfig(flowsToDelete, meters, &flowMetadata); err != nil {
+		logger.Error("meter-referred-in-flows-not-present")
+		return err
+	}
+	var respChnls []coreutils.Response
+	var partialRoute bool
+	deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: flowsToDelete}, ofp.FlowGroups{Items: flowGroups})
+	if err != nil {
+		// A no route error means no route exists between the ports specified in the flow. This can happen when the
+		// child device is deleted and a request to delete flows from the parent device is received
+		if !errors.Is(err, route.ErrNoRoute) {
+			logger.Errorw("unexpected-error-received", log.Fields{"flows-to-delete": flowsToDelete, "error": err})
+			return err
+		}
+		partialRoute = true
+	}
+
+	// Update the model
+	if err := agent.removeLogicalDeviceFlow(ctx, flow.Id); err != nil {
+		return err
+	}
+	// Update the devices
+	if partialRoute {
+		respChnls = agent.deleteFlowsFromParentDevice(ctx, ofp.Flows{Items: flowsToDelete}, &flowMetadata)
 	} else {
-		return fmt.Errorf("Cannot delete flow - %s", flow)
+		respChnls = agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
 	}
-	if changedMeter {
-		//Update model
-		metersToUpdate := &ofp.Meters{}
-		if lDevice.Meters != nil {
-			metersToUpdate = &ofp.Meters{Items: meters}
-		}
-		if err := agent.updateLogicalDeviceMetersWithoutLock(ctx, metersToUpdate); err != nil {
-			logger.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-			return err
-		}
 
-	}
-	if changedFlow {
-		var flowMetadata voltha.FlowMetadata
-		if err := agent.GetMeterConfig(flowsToDelete, meters, &flowMetadata); err != nil {
-			logger.Error("meter-referred-in-flows-not-present")
-			return err
+	// Wait for completion
+	go func() {
+		if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+			logger.Warnw("failure-deleting-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+			//TODO: Revert flow changes
 		}
-		var respChnls []coreutils.Response
-		var partialRoute bool
-		deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: flowsToDelete}, ofp.FlowGroups{Items: flowGroups})
-		if err != nil {
-			// A no route error means no route exists between the ports specified in the flow. This can happen when the
-			// child device is deleted and a request to delete flows from the parent device is received
-			if !errors.Is(err, route.ErrNoRoute) {
-				logger.Errorw("unexpected-error-received", log.Fields{"flows-to-delete": flowsToDelete, "error": err})
-				return err
-			}
-			partialRoute = true
-		}
+	}()
 
-		// Update the dB
-		if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
-			logger.Errorw("cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-			return err
-		}
-
-		// Update the devices
-		if partialRoute {
-			respChnls = agent.deleteFlowsFromParentDevice(ctx, ofp.Flows{Items: flowsToDelete}, &flowMetadata)
-		} else {
-			respChnls = agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
-		}
-
-		// Wait for completion
-		go func() {
-			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
-				logger.Warnw("failure-deleting-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
-				//TODO: Revert flow changes
-			}
-		}()
-	}
 	return nil
 }
 
@@ -1326,47 +1327,55 @@
 }
 
 func (agent *LogicalAgent) groupAdd(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
-	logger.Debug("groupAdd")
 	if groupMod == nil {
 		return nil
 	}
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+	logger.Debugw("groupAdd", log.Fields{"GroupId": groupMod.GroupId})
+	agent.groupLock.Lock()
+	_, ok := agent.groups[groupMod.GroupId]
+	if ok {
+		agent.groupLock.Unlock()
+		return fmt.Errorf("Group %d already exists", groupMod.GroupId)
+	}
+
+	groupEntry := fu.GroupEntryFromGroupMod(groupMod)
+	groupChunk := GroupChunk{
+		group: groupEntry,
+	}
+	//add to map
+	agent.groups[groupMod.GroupId] = &groupChunk
+	groupChunk.lock.Lock()
+	defer groupChunk.lock.Unlock()
+	agent.groupLock.Unlock()
+	//add to the kv store
+	path := fmt.Sprintf("groups/%s", agent.logicalDeviceID)
+	groupID := strconv.Itoa(int(groupMod.GroupId))
+	if err := agent.clusterDataProxy.AddWithID(ctx, path, groupID, groupEntry); err != nil {
+		logger.Errorw("failed-adding-group", log.Fields{"deviceID": agent.logicalDeviceID, "groupID": groupID, "err": err})
+		agent.groupLock.Lock()
+		delete(agent.groups, groupMod.GroupId)
+		agent.groupLock.Unlock()
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	deviceRules := fu.NewDeviceRules()
+	deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
+	fg := fu.NewFlowsAndGroups()
+	fg.AddGroup(fu.GroupEntryFromGroupMod(groupMod))
+	deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
 
-	lDevice := agent.getLogicalDeviceWithoutLock()
+	logger.Debugw("rules", log.Fields{"rules for group-add": deviceRules.String()})
 
-	groups := lDevice.FlowGroups.Items
-	if fu.FindGroup(groups, groupMod.GroupId) == -1 {
-		groups = append(groups, fu.GroupEntryFromGroupMod(groupMod))
+	// Update the devices
+	respChnls := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &voltha.FlowMetadata{})
 
-		deviceRules := fu.NewDeviceRules()
-		deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
-		fg := fu.NewFlowsAndGroups()
-		fg.AddGroup(fu.GroupEntryFromGroupMod(groupMod))
-		deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
-
-		logger.Debugw("rules", log.Fields{"rules for group-add": deviceRules.String()})
-
-		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(ctx, &ofp.FlowGroups{Items: groups}); err != nil {
-			logger.Errorw("cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-			return err
+	// Wait for completion
+	go func() {
+		if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+			logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+			//TODO: Revert flow changes
 		}
-
-		// Update the devices
-		respChnls := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &voltha.FlowMetadata{})
-
-		// Wait for completion
-		go func() {
-			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
-				logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
-				//TODO: Revert flow changes
-			}
-		}()
-		return nil
-	}
-	return fmt.Errorf("Groups %d already present", groupMod.GroupId)
+	}()
+	return nil
 }
 
 func (agent *LogicalAgent) groupDelete(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
@@ -1374,51 +1383,68 @@
 	if groupMod == nil {
 		return nil
 	}
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
-
-	lDevice := agent.getLogicalDeviceWithoutLock()
-	groups := lDevice.FlowGroups.Items
-	flows := lDevice.Flows.Items
+	affectedFlows := make([]*ofp.OfpFlowStats, 0)
+	affectedGroups := make([]*ofp.OfpGroupEntry, 0)
 	var groupsChanged bool
-	flowsChanged := false
 	groupID := groupMod.GroupId
+	var err error
 	if groupID == uint32(ofp.OfpGroup_OFPG_ALL) {
-		//TODO we must delete all flows that point to this group and
-		//signal controller as requested by flow's flag
-		groups = []*ofp.OfpGroupEntry{}
+		if err := func() error {
+			agent.groupLock.Lock()
+			defer agent.groupLock.Unlock()
+			for key, groupChunk := range agent.groups {
+				//Remove from store and cache. Do this in a one time lock allocation.
+				path := fmt.Sprintf("groups/%s/%d", agent.logicalDeviceID, key)
+				if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
+					return fmt.Errorf("couldnt-deleted-group-from-store-%s", path)
+				}
+				delete(agent.groups, groupID)
+				var flows []*ofp.OfpFlowStats
+				if flows, err = agent.deleteFlowsOfGroup(ctx, key); err != nil {
+					logger.Errorw("cannot-update-flow-for-group-delete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "groupID": key})
+					return err
+				}
+				affectedFlows = append(affectedFlows, flows...)
+				affectedGroups = append(affectedGroups, groupChunk.group)
+			}
+			return nil
+		}(); err != nil {
+			return err
+		}
 		groupsChanged = true
 	} else {
-		idx := fu.FindGroup(groups, groupID)
-		if idx == -1 {
-			return nil // Valid case
+		agent.groupLock.RLock()
+		groupChunk, ok := agent.groups[groupID]
+		agent.groupLock.RUnlock()
+		if !ok {
+			logger.Warnw("group-not-found", log.Fields{"groupID": groupID})
+			return nil
 		}
-		flowsChanged, flows = fu.FlowsDeleteByGroupId(flows, groupID)
-		groups = append(groups[:idx], groups[idx+1:]...)
+		groupChunk.lock.Lock()
+		defer groupChunk.lock.Unlock()
+		var flows []*ofp.OfpFlowStats
+		if flows, err = agent.deleteFlowsOfGroup(ctx, groupID); err != nil {
+			logger.Errorw("cannot-update-flow-for-group-delete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "groupID": groupID})
+			return err
+		}
+		//remove from store
+		if err := agent.removeLogicalDeviceFlowGroup(ctx, groupID); err != nil {
+			return err
+		}
+		affectedFlows = append(affectedFlows, flows...)
+		affectedGroups = append(affectedGroups, groupChunk.group)
 		groupsChanged = true
+
 	}
-	if flowsChanged || groupsChanged {
-		deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: flows}, ofp.FlowGroups{Items: groups})
+
+	if err != nil || groupsChanged {
+		var deviceRules *fu.DeviceRules
+		deviceRules, err = agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: affectedFlows}, ofp.FlowGroups{Items: affectedGroups})
 		if err != nil {
 			return err
 		}
 		logger.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if groupsChanged {
-			if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(ctx, &ofp.FlowGroups{Items: groups}); err != nil {
-				logger.Errorw("cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-				return err
-			}
-		}
-		if flowsChanged {
-			if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
-				logger.Errorw("cannot-update-flow", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-				return err
-			}
-		}
-
 		// Update the devices
 		respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, nil)
 
@@ -1433,53 +1459,65 @@
 	return nil
 }
 
+func (agent *LogicalAgent) deleteFlowsOfGroup(ctx context.Context, groupID uint32) ([]*ofp.OfpFlowStats, error) {
+	logger.Infow("Delete-flows-matching-group", log.Fields{"groupID": groupID})
+	var flowsRemoved []*ofp.OfpFlowStats
+	agent.flowLock.Lock()
+	defer agent.flowLock.Unlock()
+	for flowID, flowChunk := range agent.flows {
+		if fu.FlowHasOutGroup(flowChunk.flow, groupID) {
+			path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
+			if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
+				return nil, fmt.Errorf("couldnt-delete-flow-from-store-%s", path)
+			}
+			delete(agent.flows, flowID)
+			flowsRemoved = append(flowsRemoved, flowChunk.flow)
+		}
+	}
+	return flowsRemoved, nil
+}
+
 func (agent *LogicalAgent) groupModify(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
 	logger.Debug("groupModify")
 	if groupMod == nil {
 		return nil
 	}
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
 
-	lDevice := agent.getLogicalDeviceWithoutLock()
-	groups := lDevice.FlowGroups.Items
-	var groupsChanged bool
 	groupID := groupMod.GroupId
-	idx := fu.FindGroup(groups, groupID)
-	if idx == -1 {
+	agent.groupLock.RLock()
+	groupChunk, ok := agent.groups[groupID]
+	agent.groupLock.RUnlock()
+	if !ok {
 		return fmt.Errorf("group-absent:%d", groupID)
 	}
+	//Don't let any other thread to make modifications to this group till all done here.
+	groupChunk.lock.Lock()
+	defer groupChunk.lock.Unlock()
 	//replace existing group entry with new group definition
 	groupEntry := fu.GroupEntryFromGroupMod(groupMod)
-	groups[idx] = groupEntry
-	groupsChanged = true
-	if groupsChanged {
-		deviceRules := fu.NewDeviceRules()
-		deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
-		fg := fu.NewFlowsAndGroups()
-		fg.AddGroup(fu.GroupEntryFromGroupMod(groupMod))
-		deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
+	deviceRules := fu.NewDeviceRules()
+	deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
+	fg := fu.NewFlowsAndGroups()
+	fg.AddGroup(fu.GroupEntryFromGroupMod(groupMod))
+	deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
 
-		logger.Debugw("rules", log.Fields{"rules for group-modify": deviceRules.String()})
-
-		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(ctx, &ofp.FlowGroups{Items: groups}); err != nil {
-			logger.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-			return err
-		}
-
-		// Update the devices
-		respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, &voltha.FlowMetadata{})
-
-		// Wait for completion
-		go func() {
-			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
-				logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
-				//TODO: Revert flow changes
-			}
-		}()
+	logger.Debugw("rules", log.Fields{"rules-for-group-modify": deviceRules.String()})
+	//update KV
+	if err := agent.updateLogicalDeviceFlowGroup(ctx, groupEntry, groupChunk); err != nil {
+		logger.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+		return err
 	}
+
+	// Update the devices
+	respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, &voltha.FlowMetadata{})
+
+	// Wait for completion
+	go func() {
+		if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+			logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+			//TODO: Revert flow changes
+		}
+	}()
 	return nil
 }
 
@@ -1514,7 +1552,6 @@
 
 		// Remove the logical port from cache
 		agent.deleteLogicalPortsFromMap([]uint32{lPort.DevicePortNo})
-
 		// Reset the logical device routes
 		go func() {
 			if err := agent.buildRoutes(context.Background()); err != nil {
@@ -1993,6 +2030,64 @@
 	}
 }
 
+func (agent *LogicalAgent) loadFlows(ctx context.Context) {
+	agent.flowLock.Lock()
+	defer agent.flowLock.Unlock()
+
+	var flowList []*ofp.OfpFlowStats
+	if err := agent.clusterDataProxy.List(ctx, "logical_flows/"+agent.logicalDeviceID, &flowList); err != nil {
+		logger.Errorw("Failed-to-list-logicalflows-from-cluster-data-proxy", log.Fields{"error": err})
+		return
+	}
+	for _, flow := range flowList {
+		if flow != nil {
+			flowsChunk := FlowChunk{
+				flow: flow,
+			}
+			agent.flows[flow.Id] = &flowsChunk
+		}
+	}
+}
+
+func (agent *LogicalAgent) loadMeters(ctx context.Context) {
+	agent.meterLock.Lock()
+	defer agent.meterLock.Unlock()
+
+	var meters []*ofp.OfpMeterEntry
+	if err := agent.clusterDataProxy.List(ctx, "meters/"+agent.logicalDeviceID, &meters); err != nil {
+		logger.Errorw("Failed-to-list-meters-from-proxy", log.Fields{"error": err})
+		return
+	}
+	for _, meter := range meters {
+		if meter.Config != nil {
+			meterChunk := MeterChunk{
+				meter: meter,
+			}
+			agent.meters[meter.Config.MeterId] = &meterChunk
+		}
+	}
+}
+
+func (agent *LogicalAgent) loadGroups(ctx context.Context) {
+	agent.groupLock.Lock()
+	defer agent.groupLock.Unlock()
+
+	var groups []*ofp.OfpGroupEntry
+	if err := agent.clusterDataProxy.List(ctx, "groups/"+agent.logicalDeviceID, &groups); err != nil {
+		logger.Errorw("Failed-to-list-groups-from-proxy", log.Fields{"error": err})
+		return
+	}
+	for _, group := range groups {
+		if group.Desc != nil {
+			groupChunk := GroupChunk{
+				group: group,
+			}
+			agent.groups[group.Desc.GroupId] = &groupChunk
+		}
+	}
+	logger.Infow("Groups-are-loaded-into-the-cache-from-store", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
+}
+
 func (agent *LogicalAgent) isNNIPort(portNo uint32) bool {
 	agent.lockLogicalPortsNo.RLock()
 	defer agent.lockLogicalPortsNo.RUnlock()
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index e562400..8ec5454 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -17,18 +17,19 @@
 
 import (
 	"context"
-	"github.com/opencord/voltha-go/db/model"
-	"github.com/opencord/voltha-go/rw_core/core/adapter"
-	"github.com/opencord/voltha-lib-go/v3/pkg/db"
 	"math/rand"
 	"sync"
 	"testing"
 	"time"
 
+	"github.com/opencord/voltha-go/db/model"
+	"github.com/opencord/voltha-go/rw_core/core/adapter"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db"
+	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/rw_core/config"
 	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
-	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
 	mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
@@ -566,9 +567,13 @@
 		assert.Nil(t, err)
 		localWG.Done()
 	}()
-
 	// wait for go routines to be done
 	localWG.Wait()
+	meterEntry := fu.MeterEntryFromMeterMod(meterMod)
+
+	meterChunk, ok := ldAgent.meters[meterMod.MeterId]
+	assert.Equal(t, ok, true)
+	assert.True(t, proto.Equal(meterEntry, meterChunk.meter))
 
 	expectedChange := proto.Clone(originalLogicalDevice).(*voltha.LogicalDevice)
 	expectedChange.Ports[0].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
@@ -577,8 +582,6 @@
 	expectedChange.Ports[1].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
 	expectedChange.Ports[2].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
 	expectedChange.Ports[2].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
-	expectedChange.Meters = &voltha.Meters{Items: nil}
-	expectedChange.Meters.Items = append(expectedChange.Meters.Items, fu.MeterEntryFromMeterMod(meterMod))
 	updatedLogicalDevice, _ := ldAgent.GetLogicalDevice(context.Background())
 	assert.NotNil(t, updatedLogicalDevice)
 	assert.True(t, proto.Equal(expectedChange, updatedLogicalDevice))