[VOL-3005] Separate Flows from Device

Also some unit test functions moved to a test util class.
New loaders and Proxy implementation are applied.

Change-Id: Icf5a6f0a42a2dbaeff768fdb108f5e9b46644977
diff --git a/rw_core/core/api/common_test.go b/rw_core/core/api/common_test.go
index f9bb047..097b1c1 100644
--- a/rw_core/core/api/common_test.go
+++ b/rw_core/core/api/common_test.go
@@ -18,24 +18,12 @@
 import (
 	"context"
 	"fmt"
-	"strconv"
 	"time"
 
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/google/uuid"
-	"github.com/opencord/voltha-go/rw_core/config"
-	cm "github.com/opencord/voltha-go/rw_core/mocks"
-	"github.com/opencord/voltha-lib-go/v3/pkg/adapters"
-	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
-	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
-	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
-	"github.com/phayes/freeport"
-	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/metadata"
-	"google.golang.org/grpc/status"
 )
 
 const (
@@ -43,11 +31,6 @@
 	retryInterval         = 50 * time.Millisecond
 )
 
-const (
-	OltAdapter = iota
-	OnuAdapter
-)
-
 var (
 	coreInCompeteMode bool
 )
@@ -74,67 +57,6 @@
 	return context.Background()
 }
 
-//startEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
-func startEmbeddedEtcdServer(configName, storageDir, logLevel string) (*mock_etcd.EtcdServer, int, error) {
-	kvClientPort, err := freeport.GetFreePort()
-	if err != nil {
-		return nil, 0, err
-	}
-	peerPort, err := freeport.GetFreePort()
-	if err != nil {
-		return nil, 0, err
-	}
-	etcdServer := mock_etcd.StartEtcdServer(mock_etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
-	if etcdServer == nil {
-		return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
-	}
-	return etcdServer, kvClientPort, nil
-}
-
-func stopEmbeddedEtcdServer(server *mock_etcd.EtcdServer) {
-	if server != nil {
-		server.Stop()
-	}
-}
-
-func setupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
-	addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
-	client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout, log.FatalLevel)
-	if err != nil {
-		panic("no kv client")
-	}
-	return client
-}
-
-func createMockAdapter(adapterType int, kafkaClient kafka.Client, coreInstanceID string, coreName string, adapterName string) (adapters.IAdapter, error) {
-	var err error
-	var adapter adapters.IAdapter
-	adapterKafkaICProxy := kafka.NewInterContainerProxy(
-		kafka.MsgClient(kafkaClient),
-		kafka.DefaultTopic(&kafka.Topic{Name: adapterName}))
-	adapterCoreProxy := com.NewCoreProxy(adapterKafkaICProxy, adapterName, coreName)
-	var adapterReqHandler *com.RequestHandlerProxy
-	switch adapterType {
-	case OltAdapter:
-		adapter = cm.NewOLTAdapter(adapterCoreProxy)
-	case OnuAdapter:
-		adapter = cm.NewONUAdapter(adapterCoreProxy)
-	default:
-		logger.Fatalf("invalid-adapter-type-%d", adapterType)
-	}
-	adapterReqHandler = com.NewRequestHandlerProxy(coreInstanceID, adapter, adapterCoreProxy)
-
-	if err = adapterKafkaICProxy.Start(); err != nil {
-		logger.Errorw("Failure-starting-adapter-intercontainerProxy", log.Fields{"error": err})
-		return nil, err
-	}
-	if err = adapterKafkaICProxy.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: adapterName}, adapterReqHandler); err != nil {
-		logger.Errorw("Failure-to-subscribe-onu-request-handler", log.Fields{"error": err})
-		return nil, err
-	}
-	return adapter, nil
-}
-
 func waitUntilDeviceReadiness(deviceID string,
 	timeout time.Duration,
 	verificationFunction isDeviceConditionSatisfied,
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index 634a286..543a19d 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -35,13 +35,12 @@
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
 	"github.com/opencord/voltha-go/rw_core/core/device"
 	cm "github.com/opencord/voltha-go/rw_core/mocks"
+	tst "github.com/opencord/voltha-go/rw_core/test"
 	"github.com/opencord/voltha-lib-go/v3/pkg/db"
 	"github.com/opencord/voltha-lib-go/v3/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
 	mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
-	"github.com/opencord/voltha-lib-go/v3/pkg/version"
 	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"github.com/phayes/freeport"
@@ -50,10 +49,6 @@
 	"google.golang.org/grpc/status"
 )
 
-const (
-	coreName = "rw_core"
-)
-
 type NBTest struct {
 	etcdServer        *mock_etcd.EtcdServer
 	deviceMgr         *device.Manager
@@ -77,7 +72,7 @@
 	test := &NBTest{}
 	// Start the embedded etcd server
 	var err error
-	test.etcdServer, test.kvClientPort, err = startEmbeddedEtcdServer("voltha.rwcore.nb.test", "voltha.rwcore.nb.etcd", "error")
+	test.etcdServer, test.kvClientPort, err = tst.StartEmbeddedEtcdServer("voltha.rwcore.nb.test", "voltha.rwcore.nb.etcd", "error")
 	if err != nil {
 		logger.Fatal(err)
 	}
@@ -107,7 +102,7 @@
 	cfg.GrpcPort = grpcPort
 	cfg.GrpcHost = "127.0.0.1"
 	setCoreCompeteMode(inCompeteMode)
-	client := setupKVClient(cfg, nb.coreInstanceID)
+	client := tst.SetupKVClient(cfg, nb.coreInstanceID)
 	backend := &db.Backend{
 		Client:                  client,
 		StoreType:               cfg.KVStoreType,
@@ -141,58 +136,6 @@
 	}
 }
 
-func (nb *NBTest) createAndregisterAdapters(t *testing.T) {
-	// Setup the mock OLT adapter
-	oltAdapter, err := createMockAdapter(OltAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.oltAdapterName)
-	if err != nil {
-		logger.Fatalw("setting-mock-olt-adapter-failed", log.Fields{"error": err})
-	}
-	nb.oltAdapter = (oltAdapter).(*cm.OLTAdapter)
-	nb.numONUPerOLT = nb.oltAdapter.GetNumONUPerOLT()
-	nb.startingUNIPortNo = nb.oltAdapter.GetStartingUNIPortNo()
-
-	//	Register the adapter
-	registrationData := &voltha.Adapter{
-		Id:             nb.oltAdapterName,
-		Vendor:         "Voltha-olt",
-		Version:        version.VersionInfo.Version,
-		Type:           nb.oltAdapterName,
-		CurrentReplica: 1,
-		TotalReplicas:  1,
-		Endpoint:       nb.oltAdapterName,
-	}
-	types := []*voltha.DeviceType{{Id: nb.oltAdapterName, Adapter: nb.oltAdapterName, AcceptsAddRemoveFlowUpdates: true}}
-	deviceTypes := &voltha.DeviceTypes{Items: types}
-	if _, err := nb.adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
-		logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
-		assert.NotNil(t, err)
-	}
-
-	// Setup the mock ONU adapter
-	onuAdapter, err := createMockAdapter(OnuAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.onuAdapterName)
-	if err != nil {
-		logger.Fatalw("setting-mock-onu-adapter-failed", log.Fields{"error": err})
-	}
-	nb.onuAdapter = (onuAdapter).(*cm.ONUAdapter)
-
-	//	Register the adapter
-	registrationData = &voltha.Adapter{
-		Id:             nb.onuAdapterName,
-		Vendor:         "Voltha-onu",
-		Version:        version.VersionInfo.Version,
-		Type:           nb.onuAdapterName,
-		CurrentReplica: 1,
-		TotalReplicas:  1,
-		Endpoint:       nb.onuAdapterName,
-	}
-	types = []*voltha.DeviceType{{Id: nb.onuAdapterName, Adapter: nb.onuAdapterName, AcceptsAddRemoveFlowUpdates: true}}
-	deviceTypes = &voltha.DeviceTypes{Items: types}
-	if _, err := nb.adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
-		logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
-		assert.NotNil(t, err)
-	}
-}
-
 func (nb *NBTest) stopAll() {
 	if nb.kClient != nil {
 		nb.kClient.Stop()
@@ -201,7 +144,7 @@
 		nb.kmp.Stop()
 	}
 	if nb.etcdServer != nil {
-		stopEmbeddedEtcdServer(nb.etcdServer)
+		tst.StopEmbeddedEtcdServer(nb.etcdServer)
 	}
 }
 
@@ -1287,7 +1230,9 @@
 	nb.testCoreWithoutData(t, nbi)
 
 	// Create/register the adapters
-	nb.createAndregisterAdapters(t)
+	nb.oltAdapter, nb.onuAdapter = tst.CreateAndregisterAdapters(t, nb.kClient, nb.coreInstanceID, nb.oltAdapterName, nb.onuAdapterName, nb.adapterMgr)
+	nb.numONUPerOLT = nb.oltAdapter.GetNumONUPerOLT()
+	nb.startingUNIPortNo = nb.oltAdapter.GetStartingUNIPortNo()
 
 	// 2. Test adapter registration
 	nb.testAdapterRegistration(t, nbi)
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 918432f..fe379d6 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -22,18 +22,21 @@
 	"errors"
 	"fmt"
 	"reflect"
+	"strconv"
 	"sync"
 	"time"
 
 	"github.com/golang/protobuf/ptypes"
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
+	"github.com/opencord/voltha-go/rw_core/core/device/flow"
+	"github.com/opencord/voltha-go/rw_core/core/device/group"
 	"github.com/opencord/voltha-go/rw_core/core/device/remote"
+	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/db/model"
 	coreutils "github.com/opencord/voltha-go/rw_core/utils"
-	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
@@ -59,10 +62,13 @@
 	startOnce      sync.Once
 	stopOnce       sync.Once
 	stopped        bool
+
+	flowLoader  *flow.Loader
+	groupLoader *group.Loader
 }
 
 //newAgent creates a new device agent. The device will be initialized when start() is called.
-func newAgent(ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, deviceProxy *model.Proxy, timeout time.Duration) *Agent {
+func newAgent(ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, dbProxy *model.Path, deviceProxy *model.Proxy, timeout time.Duration) *Agent {
 	var agent Agent
 	agent.adapterProxy = ap
 	if device.Id == "" {
@@ -81,6 +87,9 @@
 	agent.defaultTimeout = timeout
 	agent.device = proto.Clone(device).(*voltha.Device)
 	agent.requestQueue = coreutils.NewRequestQueue()
+	agent.flowLoader = flow.NewLoader(dbProxy.SubPath("flows").Proxy(device.Id))
+	agent.groupLoader = group.NewLoader(dbProxy.SubPath("groups").Proxy(device.Id))
+
 	return &agent
 }
 
@@ -114,6 +123,9 @@
 
 		agent.deviceType = device.Adapter
 		agent.device = proto.Clone(device).(*voltha.Device)
+		// load the flows and groups from KV to cache
+		agent.flowLoader.Load(ctx)
+		agent.groupLoader.Load(ctx)
 
 		logger.Infow("device-loaded-from-dB", log.Fields{"device-id": agent.deviceID})
 	} else {
@@ -138,7 +150,6 @@
 		}
 		agent.device = device
 	}
-
 	startSucceeded = true
 	logger.Debugw("device-agent-started", log.Fields{"device-id": agent.deviceID})
 
@@ -191,6 +202,8 @@
 
 	agent.deviceType = device.Adapter
 	agent.device = device
+	agent.flowLoader.Load(ctx)
+	agent.groupLoader.Load(ctx)
 	logger.Debugw("reconciled-device-agent-devicetype", log.Fields{"device-id": agent.deviceID, "type": agent.deviceType})
 }
 
@@ -317,6 +330,15 @@
 	}
 }
 
+//replaceFlowInList removes the old flow from list and adds the new one.
+func replaceFlowInList(flowList []*ofp.OfpFlowStats, oldFlow *ofp.OfpFlowStats, newFlow *ofp.OfpFlowStats) []*ofp.OfpFlowStats {
+	if idx := fu.FindFlows(flowList, oldFlow); idx != -1 {
+		flowList = deleteFlowWithoutPreservingOrder(flowList, idx)
+	}
+	flowList = append(flowList, newFlow)
+	return flowList
+}
+
 //deleteFlowWithoutPreservingOrder removes a flow specified by index from the flows slice.  This function will
 //panic if the index is out of range.
 func deleteFlowWithoutPreservingOrder(flows []*ofp.OfpFlowStats, index int) []*ofp.OfpFlowStats {
@@ -325,6 +347,15 @@
 	return flows[:len(flows)-1]
 }
 
+//replaceGroupInList removes the old group from list and adds the new one.
+func replaceGroupInList(groupList []*ofp.OfpGroupEntry, oldGroup *ofp.OfpGroupEntry, newGroup *ofp.OfpGroupEntry) []*ofp.OfpGroupEntry {
+	if idx := fu.FindGroup(groupList, oldGroup.Desc.GroupId); idx != -1 {
+		groupList = deleteGroupWithoutPreservingOrder(groupList, idx)
+	}
+	groupList = append(groupList, newGroup)
+	return groupList
+}
+
 //deleteGroupWithoutPreservingOrder removes a group specified by index from the groups slice.  This function will
 //panic if the index is out of range.
 func deleteGroupWithoutPreservingOrder(groups []*ofp.OfpGroupEntry, index int) []*ofp.OfpGroupEntry {
@@ -333,100 +364,70 @@
 	return groups[:len(groups)-1]
 }
 
-func flowsToUpdateToDelete(newFlows, existingFlows []*ofp.OfpFlowStats) (updatedNewFlows, flowsToDelete, updatedAllFlows []*ofp.OfpFlowStats) {
-	// Process flows
-	for _, flow := range existingFlows {
-		if idx := fu.FindFlows(newFlows, flow); idx == -1 {
-			updatedAllFlows = append(updatedAllFlows, flow)
-		} else {
-			// We have a matching flow (i.e. the following field matches: "TableId", "Priority", "Flags", "Cookie",
-			// "Match".  If this is an exact match (i.e. all other fields matches as well) then this flow will be
-			// ignored.  Otherwise, the previous flow will be deleted and the new one added
-			if proto.Equal(newFlows[idx], flow) {
-				// Flow already exist, remove it from the new flows but keep it in the updated flows slice
-				newFlows = deleteFlowWithoutPreservingOrder(newFlows, idx)
-				updatedAllFlows = append(updatedAllFlows, flow)
-			} else {
-				// Minor change to flow, delete old and add new one
-				flowsToDelete = append(flowsToDelete, flow)
-			}
-		}
-	}
-	updatedAllFlows = append(updatedAllFlows, newFlows...)
-	return newFlows, flowsToDelete, updatedAllFlows
-}
+func (agent *Agent) addFlowsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("add-flows-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
 
-func groupsToUpdateToDelete(newGroups, existingGroups []*ofp.OfpGroupEntry) (updatedNewGroups, groupsToDelete, updatedAllGroups []*ofp.OfpGroupEntry) {
-	for _, group := range existingGroups {
-		if idx := fu.FindGroup(newGroups, group.Desc.GroupId); idx == -1 { // does not exist now
-			updatedAllGroups = append(updatedAllGroups, group)
-		} else {
-			// Follow same logic as flows
-			if proto.Equal(newGroups[idx], group) {
-				// Group already exist, remove it from the new groups
-				newGroups = deleteGroupWithoutPreservingOrder(newGroups, idx)
-				updatedAllGroups = append(updatedAllGroups, group)
-			} else {
-				// Minor change to group, delete old and add new one
-				groupsToDelete = append(groupsToDelete, group)
-			}
-		}
-	}
-	updatedAllGroups = append(updatedAllGroups, newGroups...)
-	return newGroups, groupsToDelete, updatedAllGroups
-}
-
-func (agent *Agent) addFlowsAndGroupsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
-	logger.Debugw("add-flows-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups, "flow-metadata": flowMetadata})
-
-	if (len(newFlows) | len(newGroups)) == 0 {
-		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
+	if (len(newFlows)) == 0 {
+		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
 		return coreutils.DoneResponse(), nil
 	}
-
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return coreutils.DoneResponse(), err
-	}
-	defer agent.requestQueue.RequestComplete()
-
 	device := agent.getDeviceWithoutLock()
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
+	updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		flowIDs := agent.flowLoader.List()
+		for flowID := range flowIDs {
+			if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+				updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
+				flowHandle.Unlock()
+			}
+		}
+	}
+	flowsToAdd := make([]*ofp.OfpFlowStats, 0)
+	flowsToDelete := make([]*ofp.OfpFlowStats, 0)
+	for _, flow := range newFlows {
+		flowHandle, created, err := agent.flowLoader.LockOrCreate(ctx, flow)
+		if err != nil {
+			return coreutils.DoneResponse(), err
+		}
 
-	existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
-	existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
+		if created {
+			flowsToAdd = append(flowsToAdd, flow)
+			updatedAllFlows = append(updatedAllFlows, flow)
+		} else {
+			flowToReplace := flowHandle.GetReadOnly()
+			if !proto.Equal(flowToReplace, flow) {
+				//Flow needs to be updated.
+				if err := flowHandle.Update(ctx, flow); err != nil {
+					flowHandle.Unlock()
+					return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
+				}
+				flowsToDelete = append(flowsToDelete, flowToReplace)
+				flowsToAdd = append(flowsToAdd, flow)
+				updatedAllFlows = replaceFlowInList(updatedAllFlows, flowToReplace, flow)
+			} else {
+				//No need to change the flow. It is already exist.
+				logger.Debugw("No-need-to-change-already-existing-flow", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
+			}
+		}
 
-	// Process flows
-	newFlows, flowsToDelete, updatedAllFlows := flowsToUpdateToDelete(newFlows, existingFlows.Items)
-
-	// Process groups
-	newGroups, groupsToDelete, updatedAllGroups := groupsToUpdateToDelete(newGroups, existingGroups.Items)
-
-	// Sanity check
-	if (len(updatedAllFlows) | len(flowsToDelete) | len(updatedAllGroups) | len(groupsToDelete)) == 0 {
-		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
-		return coreutils.DoneResponse(), nil
+		flowHandle.Unlock()
 	}
 
-	// store the changed data
-	device.Flows = &voltha.Flows{Items: updatedAllFlows}
-	device.FlowGroups = &voltha.FlowGroups{Items: updatedAllGroups}
-	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-device-%s", agent.deviceID)
+	// Sanity check
+	if (len(flowsToAdd)) == 0 {
+		logger.Debugw("no-flows-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
+		return coreutils.DoneResponse(), nil
 	}
 
 	// Send update to adapters
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
 	response := coreutils.NewResponse()
 	if !dType.AcceptsAddRemoveFlowUpdates {
-		if len(updatedAllGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedAllGroups) && len(updatedAllFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedAllFlows) {
-			logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
-			cancel()
-			return coreutils.DoneResponse(), nil
-		}
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &ofp.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, flowMetadata)
 		if err != nil {
 			cancel()
 			return coreutils.DoneResponse(), err
@@ -434,11 +435,102 @@
 		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	} else {
 		flowChanges := &ofp.FlowChanges{
-			ToAdd:    &voltha.Flows{Items: newFlows},
+			ToAdd:    &voltha.Flows{Items: flowsToAdd},
 			ToRemove: &voltha.Flows{Items: flowsToDelete},
 		}
 		groupChanges := &ofp.FlowGroupChanges{
-			ToAdd:    &voltha.FlowGroups{Items: newGroups},
+			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+		}
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	}
+	return response, nil
+}
+
+func (agent *Agent) addGroupsToAdapter(ctx context.Context, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("add-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "groups": newGroups, "flow-metadata": flowMetadata})
+
+	if (len(newGroups)) == 0 {
+		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
+		return coreutils.DoneResponse(), nil
+	}
+
+	device := agent.getDeviceWithoutLock()
+	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+	if err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
+	updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		groupIDs := agent.groupLoader.List()
+		for groupID := range groupIDs {
+			if grpHandle, have := agent.groupLoader.Lock(groupID); have {
+				updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
+				grpHandle.Unlock()
+			}
+		}
+	}
+
+	groupsToAdd := make([]*ofp.OfpGroupEntry, 0)
+	groupsToDelete := make([]*ofp.OfpGroupEntry, 0)
+	for _, group := range newGroups {
+
+		groupHandle, created, err := agent.groupLoader.LockOrCreate(ctx, group)
+		if err != nil {
+			return coreutils.DoneResponse(), err
+		}
+
+		if created {
+			groupsToAdd = append(groupsToAdd, group)
+			updatedAllGroups = append(updatedAllGroups, group)
+		} else {
+			groupToChange := groupHandle.GetReadOnly()
+			if !proto.Equal(groupToChange, group) {
+				//Group needs to be updated.
+				if err := groupHandle.Update(ctx, group); err != nil {
+					groupHandle.Unlock()
+					return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
+				}
+				groupsToDelete = append(groupsToDelete, groupToChange)
+				groupsToAdd = append(groupsToAdd, group)
+				updatedAllGroups = replaceGroupInList(updatedAllGroups, groupToChange, group)
+			} else {
+				//No need to change the group. It is already exist.
+				logger.Debugw("No-need-to-change-already-existing-group", log.Fields{"device-id": agent.deviceID, "group": newGroups, "flow-metadata": flowMetadata})
+			}
+		}
+
+		groupHandle.Unlock()
+	}
+	// Sanity check
+	if (len(groupsToAdd)) == 0 {
+		logger.Debugw("no-groups-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
+		return coreutils.DoneResponse(), nil
+	}
+
+	// Send update to adapters
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	} else {
+		flowChanges := &ofp.FlowChanges{
+			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+			ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+		}
+		groupChanges := &ofp.FlowGroupChanges{
+			ToAdd:    &voltha.FlowGroups{Items: groupsToAdd},
 			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
 			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 		}
@@ -455,88 +547,66 @@
 //addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
 //adapters
 func (agent *Agent) addFlowsAndGroups(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
-	response, err := agent.addFlowsAndGroupsToAdapter(ctx, newFlows, newGroups, flowMetadata)
-	if err != nil {
+	var flwResponse, grpResponse coreutils.Response
+	var err error
+	//if new flow list is empty then the called function returns quickly
+	if flwResponse, err = agent.addFlowsToAdapter(ctx, newFlows, flowMetadata); err != nil {
 		return err
 	}
-	if errs := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); errs != nil {
+	//if new group list is empty then the called function returns quickly
+	if grpResponse, err = agent.addGroupsToAdapter(ctx, newGroups, flowMetadata); err != nil {
+		return err
+	}
+	if errs := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); errs != nil {
 		logger.Warnw("no-adapter-response", log.Fields{"device-id": agent.deviceID, "result": errs})
 		return status.Errorf(codes.Aborted, "flow-failure-device-%s", agent.deviceID)
 	}
 	return nil
 }
 
-func (agent *Agent) deleteFlowsAndGroupsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
-	logger.Debugw("delete-flows-groups-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
+func (agent *Agent) deleteFlowsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("delete-flows-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
 
-	if (len(flowsToDel) | len(groupsToDel)) == 0 {
-		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
+	if (len(flowsToDel)) == 0 {
+		logger.Debugw("nothing-to-delete", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
 		return coreutils.DoneResponse(), nil
 	}
 
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return coreutils.DoneResponse(), err
-	}
-	defer agent.requestQueue.RequestComplete()
-
 	device := agent.getDeviceWithoutLock()
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
-
-	existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
-	existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
-
-	var flowsToKeep []*ofp.OfpFlowStats
-	var groupsToKeep []*ofp.OfpGroupEntry
-
-	// Process flows
-	for _, flow := range existingFlows.Items {
-		if idx := fu.FindFlows(flowsToDel, flow); idx == -1 {
-			flowsToKeep = append(flowsToKeep, flow)
+	updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		flowIDs := agent.flowLoader.List()
+		for flowID := range flowIDs {
+			if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+				updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
+				flowHandle.Unlock()
+			}
 		}
 	}
-
-	// Process groups
-	for _, group := range existingGroups.Items {
-		if fu.FindGroup(groupsToDel, group.Desc.GroupId) == -1 { // does not exist now
-			groupsToKeep = append(groupsToKeep, group)
+	for _, flow := range flowsToDel {
+		if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
+			// Update the store and cache
+			flowToDelete := flowHandle.GetReadOnly()
+			if err := flowHandle.Delete(ctx); err != nil {
+				flowHandle.Unlock()
+				return coreutils.DoneResponse(), err
+			}
+			if idx := fu.FindFlows(updatedAllFlows, flowToDelete); idx != -1 {
+				updatedAllFlows = deleteFlowWithoutPreservingOrder(updatedAllFlows, idx)
+			}
+			flowHandle.Unlock()
 		}
 	}
 
-	logger.Debugw("deleteFlowsAndGroups",
-		log.Fields{
-			"device-id":      agent.deviceID,
-			"flows-to-del":   len(flowsToDel),
-			"flows-to-keep":  len(flowsToKeep),
-			"groups-to-del":  len(groupsToDel),
-			"groups-to-keep": len(groupsToKeep),
-		})
-
-	// Sanity check
-	if (len(flowsToKeep) | len(flowsToDel) | len(groupsToKeep) | len(groupsToDel)) == 0 {
-		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows-to-del": flowsToDel, "groups-to-del": groupsToDel})
-		return coreutils.DoneResponse(), nil
-	}
-
-	// store the changed data
-	device.Flows = &voltha.Flows{Items: flowsToKeep}
-	device.FlowGroups = &voltha.FlowGroups{Items: groupsToKeep}
-	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
-	}
-
 	// Send update to adapters
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
 	response := coreutils.NewResponse()
 	if !dType.AcceptsAddRemoveFlowUpdates {
-		if len(groupsToKeep) != 0 && reflect.DeepEqual(existingGroups.Items, groupsToKeep) && len(flowsToKeep) != 0 && reflect.DeepEqual(existingFlows.Items, flowsToKeep) {
-			logger.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
-			cancel()
-			return coreutils.DoneResponse(), nil
-		}
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata)
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, flowMetadata)
 		if err != nil {
 			cancel()
 			return coreutils.DoneResponse(), err
@@ -549,6 +619,73 @@
 		}
 		groupChanges := &ofp.FlowGroupChanges{
 			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+		}
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	}
+	return response, nil
+}
+
+func (agent *Agent) deleteGroupsFromAdapter(ctx context.Context, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("delete-groups-from-adapter", log.Fields{"device-id": agent.deviceID, "groups": groupsToDel})
+
+	if (len(groupsToDel)) == 0 {
+		logger.Debugw("nothing-to-delete", log.Fields{"device-id": agent.deviceID})
+		return coreutils.DoneResponse(), nil
+	}
+	device := agent.getDeviceWithoutLock()
+	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+	if err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
+	updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		groupIDs := agent.groupLoader.List()
+		for groupID := range groupIDs {
+			if grpHandle, have := agent.groupLoader.Lock(groupID); have {
+				updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
+				grpHandle.Unlock()
+			}
+		}
+	}
+
+	for _, group := range groupsToDel {
+		if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
+			// Update the store and cache
+			if err := groupHandle.Delete(ctx); err != nil {
+				groupHandle.Unlock()
+				return coreutils.DoneResponse(), err
+			}
+			if idx := fu.FindGroup(updatedAllGroups, group.Desc.GroupId); idx != -1 {
+				updatedAllGroups = deleteGroupWithoutPreservingOrder(updatedAllGroups, idx)
+			}
+			groupHandle.Unlock()
+		}
+	}
+
+	// Send update to adapters
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	} else {
+		flowChanges := &ofp.FlowChanges{
+			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+			ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+		}
+		groupChanges := &ofp.FlowGroupChanges{
+			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 			ToRemove: &voltha.FlowGroups{Items: groupsToDel},
 			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 		}
@@ -565,11 +702,16 @@
 //deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the
 //adapters
 func (agent *Agent) deleteFlowsAndGroups(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
-	response, err := agent.deleteFlowsAndGroupsFromAdapter(ctx, flowsToDel, groupsToDel, flowMetadata)
-	if err != nil {
+	var flwResponse, grpResponse coreutils.Response
+	var err error
+	if flwResponse, err = agent.deleteFlowsFromAdapter(ctx, flowsToDel, flowMetadata); err != nil {
 		return err
 	}
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
+	if grpResponse, err = agent.deleteGroupsFromAdapter(ctx, groupsToDel, flowMetadata); err != nil {
+		return err
+	}
+
+	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
@@ -577,25 +719,24 @@
 
 //filterOutFlows removes flows from a device using the uni-port as filter
 func (agent *Agent) filterOutFlows(ctx context.Context, uniPort uint32, flowMetadata *voltha.FlowMetadata) error {
-	device, err := agent.getDevice(ctx)
-	if err != nil {
-		return err
-	}
-	existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
 	var flowsToDelete []*ofp.OfpFlowStats
-
 	// If an existing flow has the uniPort as an InPort or OutPort or as a Tunnel ID then it needs to be removed
-	for _, flow := range existingFlows.Items {
-		if fu.GetInPort(flow) == uniPort || fu.GetOutPort(flow) == uniPort || fu.GetTunnelId(flow) == uint64(uniPort) {
-			flowsToDelete = append(flowsToDelete, flow)
+	for flowID := range agent.flowLoader.List() {
+		if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+			flow := flowHandle.GetReadOnly()
+			if flow != nil && (fu.GetInPort(flow) == uniPort || fu.GetOutPort(flow) == uniPort || fu.GetTunnelId(flow) == uint64(uniPort)) {
+				flowsToDelete = append(flowsToDelete, flow)
+			}
+			flowHandle.Unlock()
 		}
 	}
+
 	logger.Debugw("flows-to-delete", log.Fields{"device-id": agent.deviceID, "uni-port": uniPort, "flows": flowsToDelete})
 	if len(flowsToDelete) == 0 {
 		return nil
 	}
 
-	response, err := agent.deleteFlowsAndGroupsFromAdapter(ctx, flowsToDelete, []*ofp.OfpGroupEntry{}, flowMetadata)
+	response, err := agent.deleteFlowsFromAdapter(ctx, flowsToDelete, flowMetadata)
 	if err != nil {
 		return err
 	}
@@ -605,19 +746,14 @@
 	return nil
 }
 
-func (agent *Agent) updateFlowsAndGroupsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
-	logger.Debugw("updateFlowsAndGroups", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("updateFlowsToAdapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
 
-	if (len(updatedFlows) | len(updatedGroups)) == 0 {
-		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+	if (len(updatedFlows)) == 0 {
+		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
 		return coreutils.DoneResponse(), nil
 	}
 
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return coreutils.DoneResponse(), err
-	}
-	defer agent.requestQueue.RequestComplete()
-
 	device := agent.getDeviceWithoutLock()
 	if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
@@ -626,81 +762,55 @@
 	if err != nil {
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
-
-	existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
-	existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
-
-	if len(updatedGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedGroups) && len(updatedFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedFlows) {
-		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
-		return coreutils.DoneResponse(), nil
+	updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		flowIDs := agent.flowLoader.List()
+		for flowID := range flowIDs {
+			if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+				updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
+				flowHandle.Unlock()
+			}
+		}
 	}
+	flowsToAdd := make([]*ofp.OfpFlowStats, 0)
+	flowsToDelete := make([]*ofp.OfpFlowStats, 0)
 
-	logger.Debugw("updating-flows-and-groups",
-		log.Fields{
-			"device-id":      agent.deviceID,
-			"updated-flows":  updatedFlows,
-			"updated-groups": updatedGroups,
-		})
+	for _, flow := range updatedFlows {
+		if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
+			flowToDelete := flowHandle.GetReadOnly()
+			// Update the store and cache
+			if err := flowHandle.Update(ctx, flow); err != nil {
+				flowHandle.Unlock()
+				return coreutils.DoneResponse(), err
+			}
 
-	// store the updated data
-	device.Flows = &voltha.Flows{Items: updatedFlows}
-	device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
-	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
+			flowsToDelete = append(flowsToDelete, flowToDelete)
+			flowsToAdd = append(flowsToAdd, flow)
+			updatedAllFlows = replaceFlowInList(updatedAllFlows, flowToDelete, flow)
+			flowHandle.Unlock()
+		}
 	}
 
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
 	response := coreutils.NewResponse()
 	// Process bulk flow update differently than incremental update
 	if !dType.AcceptsAddRemoveFlowUpdates {
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil)
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, nil)
 		if err != nil {
 			cancel()
 			return coreutils.DoneResponse(), err
 		}
 		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	} else {
-		var flowsToAdd []*ofp.OfpFlowStats
-		var flowsToDelete []*ofp.OfpFlowStats
-		var groupsToAdd []*ofp.OfpGroupEntry
-		var groupsToDelete []*ofp.OfpGroupEntry
-
-		// Process flows
-		for _, flow := range updatedFlows {
-			if idx := fu.FindFlows(existingFlows.Items, flow); idx == -1 {
-				flowsToAdd = append(flowsToAdd, flow)
-			}
-		}
-		for _, flow := range existingFlows.Items {
-			if idx := fu.FindFlows(updatedFlows, flow); idx != -1 {
-				flowsToDelete = append(flowsToDelete, flow)
-			}
-		}
-
-		// Process groups
-		for _, g := range updatedGroups {
-			if fu.FindGroup(existingGroups.Items, g.Desc.GroupId) == -1 { // does not exist now
-				groupsToAdd = append(groupsToAdd, g)
-			}
-		}
-		for _, group := range existingGroups.Items {
-			if fu.FindGroup(updatedGroups, group.Desc.GroupId) != -1 { // does not exist now
-				groupsToDelete = append(groupsToDelete, group)
-			}
-		}
-
 		logger.Debugw("updating-flows-and-groups",
 			log.Fields{
-				"device-id":        agent.deviceID,
-				"flows-to-add":     flowsToAdd,
-				"flows-to-delete":  flowsToDelete,
-				"groups-to-add":    groupsToAdd,
-				"groups-to-delete": groupsToDelete,
+				"device-id":       agent.deviceID,
+				"flows-to-add":    flowsToAdd,
+				"flows-to-delete": flowsToDelete,
 			})
-
 		// Sanity check
-		if (len(flowsToAdd) | len(flowsToDelete) | len(groupsToAdd) | len(groupsToDelete) | len(updatedGroups)) == 0 {
-			logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+		if (len(flowsToAdd) | len(flowsToDelete)) == 0 {
+			logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
 			cancel()
 			return coreutils.DoneResponse(), nil
 		}
@@ -710,9 +820,94 @@
 			ToRemove: &voltha.Flows{Items: flowsToDelete},
 		}
 		groupChanges := &ofp.FlowGroupChanges{
-			ToAdd:    &voltha.FlowGroups{Items: groupsToAdd},
-			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
-			ToUpdate: &voltha.FlowGroups{Items: updatedGroups},
+			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+		}
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	}
+
+	return response, nil
+}
+
+func (agent *Agent) updateGroupsToAdapter(ctx context.Context, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("updateGroupsToAdapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
+
+	if (len(updatedGroups)) == 0 {
+		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
+		return coreutils.DoneResponse(), nil
+	}
+
+	device := agent.getDeviceWithoutLock()
+	if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
+	}
+	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+	if err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
+	updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		groupIDs := agent.groupLoader.List()
+		for groupID := range groupIDs {
+			if grpHandle, have := agent.groupLoader.Lock(groupID); have {
+				updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
+				grpHandle.Unlock()
+			}
+		}
+	}
+	groupsToUpdate := make([]*ofp.OfpGroupEntry, 0)
+
+	for _, group := range updatedGroups {
+		if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
+			// Update the store and cache
+			if err := groupHandle.Update(ctx, group); err != nil {
+				groupHandle.Unlock()
+				return coreutils.DoneResponse(), err
+			}
+			groupsToUpdate = append(groupsToUpdate, group)
+			updatedAllGroups = replaceGroupInList(updatedAllGroups, groupHandle.GetReadOnly(), group)
+			groupHandle.Unlock()
+		}
+	}
+
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
+	// Process bulk flow update differently than incremental update
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, nil)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	} else {
+		logger.Debugw("updating-groups",
+			log.Fields{
+				"device-id":        agent.deviceID,
+				"groups-to-update": groupsToUpdate,
+			})
+
+		// Sanity check
+		if (len(groupsToUpdate)) == 0 {
+			logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": groupsToUpdate})
+			cancel()
+			return coreutils.DoneResponse(), nil
+		}
+
+		flowChanges := &ofp.FlowChanges{
+			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+			ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+		}
+		groupChanges := &ofp.FlowGroupChanges{
+			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToUpdate: &voltha.FlowGroups{Items: groupsToUpdate},
 		}
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
 		if err != nil {
@@ -728,11 +923,16 @@
 //updateFlowsAndGroups replaces the existing flows and groups with "updatedFlows" and "updatedGroups" respectively. It
 //also sends the updates to the adapters
 func (agent *Agent) updateFlowsAndGroups(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
-	response, err := agent.updateFlowsAndGroupsToAdapter(ctx, updatedFlows, updatedGroups, flowMetadata)
-	if err != nil {
+	var flwResponse, grpResponse coreutils.Response
+	var err error
+	if flwResponse, err = agent.updateFlowsToAdapter(ctx, updatedFlows, flowMetadata); err != nil {
 		return err
 	}
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
+	if grpResponse, err = agent.updateGroupsToAdapter(ctx, updatedGroups, flowMetadata); err != nil {
+		return err
+	}
+
+	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
@@ -741,17 +941,17 @@
 //deleteAllFlows deletes all flows in the device table
 func (agent *Agent) deleteAllFlows(ctx context.Context) error {
 	logger.Debugw("deleteAllFlows", log.Fields{"deviceId": agent.deviceID})
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
 
-	device := agent.getDeviceWithoutLock()
-	// purge all flows on the device by setting it to nil
-	device.Flows = &ofp.Flows{Items: nil}
-	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
-		// The caller logs the error
-		return err
+	for flowID := range agent.flowLoader.List() {
+		if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+			// Update the store and cache
+			if err := flowHandle.Delete(ctx); err != nil {
+				flowHandle.Unlock()
+				logger.Errorw("unable-to-delete-flow", log.Fields{"device-id": agent.deviceID, "flowID": flowID})
+				continue
+			}
+			flowHandle.Unlock()
+		}
 	}
 	return nil
 }
@@ -1296,13 +1496,6 @@
 	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
 }
 
-func (agent *Agent) updateDeviceWithoutLock(ctx context.Context, device *voltha.Device) error {
-	logger.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
-	//cloned := proto.Clone(device).(*voltha.Device)
-	cloned := device
-	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
 func (agent *Agent) updateDeviceStatus(ctx context.Context, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
new file mode 100644
index 0000000..bb8fe08
--- /dev/null
+++ b/rw_core/core/device/agent_flow.go
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+)
+
+// listDeviceFlows returns device flows
+func (agent *Agent) listDeviceFlows() map[uint64]*ofp.OfpFlowStats {
+	flowIDs := agent.flowLoader.List()
+	flows := make(map[uint64]*ofp.OfpFlowStats, len(flowIDs))
+	for flowID := range flowIDs {
+		if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+			flows[flowID] = flowHandle.GetReadOnly()
+			flowHandle.Unlock()
+		}
+	}
+	return flows
+}
diff --git a/rw_core/core/device/agent_group.go b/rw_core/core/device/agent_group.go
new file mode 100644
index 0000000..18ac83a
--- /dev/null
+++ b/rw_core/core/device/agent_group.go
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+)
+
+// listDeviceGroups returns logical device flow groups
+func (agent *Agent) listDeviceGroups() map[uint32]*ofp.OfpGroupEntry {
+	groupIDs := agent.groupLoader.List()
+	groups := make(map[uint32]*ofp.OfpGroupEntry, len(groupIDs))
+	for groupID := range groupIDs {
+		if groupHandle, have := agent.groupLoader.Lock(groupID); have {
+			groups[groupID] = groupHandle.GetReadOnly()
+			groupHandle.Unlock()
+		}
+	}
+	return groups
+}
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index ffc2a3b..29d8062 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -18,13 +18,21 @@
 
 import (
 	"context"
+	"math/rand"
+	"sort"
+	"strings"
+	"sync"
+	"testing"
+	"time"
+
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/rw_core/config"
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
+	cm "github.com/opencord/voltha-go/rw_core/mocks"
+	tst "github.com/opencord/voltha-go/rw_core/test"
 	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
 	"github.com/opencord/voltha-lib-go/v3/pkg/db"
-	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
@@ -33,24 +41,18 @@
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"github.com/phayes/freeport"
 	"github.com/stretchr/testify/assert"
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
-	"math/rand"
-	"sort"
-	"strconv"
-	"strings"
-	"sync"
-	"testing"
-	"time"
 )
 
 type DATest struct {
 	etcdServer       *mock_etcd.EtcdServer
 	deviceMgr        *Manager
 	logicalDeviceMgr *LogicalManager
+	adapterMgr       *adapter.Manager
 	kmp              kafka.InterContainerProxy
 	kClient          kafka.Client
 	kvClientPort     int
+	oltAdapter       *cm.OLTAdapter
+	onuAdapter       *cm.ONUAdapter
 	oltAdapterName   string
 	onuAdapterName   string
 	coreInstanceID   string
@@ -64,7 +66,7 @@
 	test := &DATest{}
 	// Start the embedded etcd server
 	var err error
-	test.etcdServer, test.kvClientPort, err = startEmbeddedEtcdServer("voltha.rwcore.da.test", "voltha.rwcore.da.etcd", "error")
+	test.etcdServer, test.kvClientPort, err = tst.StartEmbeddedEtcdServer("voltha.rwcore.da.test", "voltha.rwcore.da.etcd", "error")
 	if err != nil {
 		logger.Fatal(err)
 	}
@@ -120,7 +122,7 @@
 	}
 	cfg.GrpcPort = grpcPort
 	cfg.GrpcHost = "127.0.0.1"
-	client := setupKVClient(cfg, dat.coreInstanceID)
+	client := tst.SetupKVClient(cfg, dat.coreInstanceID)
 	backend := &db.Backend{
 		Client:                  client,
 		StoreType:               cfg.KVStoreType,
@@ -138,13 +140,18 @@
 
 	endpointMgr := kafka.NewEndpointManager(backend)
 	proxy := model.NewDBPath(backend)
-	adapterMgr := adapter.NewAdapterManager(proxy, dat.coreInstanceID, dat.kClient)
+	dat.adapterMgr = adapter.NewAdapterManager(proxy, dat.coreInstanceID, dat.kClient)
 
-	dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, adapterMgr, dat.kmp, endpointMgr, cfg.CorePairTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
+	dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CorePairTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
+	dat.adapterMgr.Start(context.Background())
 	if err = dat.kmp.Start(); err != nil {
 		logger.Fatal("Cannot start InterContainerProxy")
 	}
-	adapterMgr.Start(context.Background())
+
+	if err := dat.kmp.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: cfg.CorePairTopic}, kafka.OffsetNewest); err != nil {
+		logger.Fatalf("Cannot add default request handler: %s", err)
+	}
+
 }
 
 func (dat *DATest) stopAll() {
@@ -155,46 +162,14 @@
 		dat.kmp.Stop()
 	}
 	if dat.etcdServer != nil {
-		stopEmbeddedEtcdServer(dat.etcdServer)
+		tst.StopEmbeddedEtcdServer(dat.etcdServer)
 	}
 }
 
-//startEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
-func startEmbeddedEtcdServer(configName, storageDir, logLevel string) (*mock_etcd.EtcdServer, int, error) {
-	kvClientPort, err := freeport.GetFreePort()
-	if err != nil {
-		return nil, 0, err
-	}
-	peerPort, err := freeport.GetFreePort()
-	if err != nil {
-		return nil, 0, err
-	}
-	etcdServer := mock_etcd.StartEtcdServer(mock_etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
-	if etcdServer == nil {
-		return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
-	}
-	return etcdServer, kvClientPort, nil
-}
-
-func stopEmbeddedEtcdServer(server *mock_etcd.EtcdServer) {
-	if server != nil {
-		server.Stop()
-	}
-}
-
-func setupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
-	addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
-	client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout, log.FatalLevel)
-	if err != nil {
-		panic("no kv client")
-	}
-	return client
-}
-
 func (dat *DATest) createDeviceAgent(t *testing.T) *Agent {
 	deviceMgr := dat.deviceMgr
 	clonedDevice := proto.Clone(dat.device).(*voltha.Device)
-	deviceAgent := newAgent(deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.dProxy, deviceMgr.defaultTimeout)
+	deviceAgent := newAgent(deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.dbPath, deviceMgr.dProxy, deviceMgr.defaultTimeout)
 	d, err := deviceAgent.start(context.TODO(), clonedDevice)
 	assert.Nil(t, err)
 	assert.NotNil(t, d)
@@ -278,7 +253,7 @@
 		da := newDATest()
 		assert.NotNil(t, da)
 		defer da.stopAll()
-
+		log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
 		// Start the Core
 		da.startCore(false)
 
@@ -293,6 +268,37 @@
 		wg.Wait()
 	}
 }
+func TestFlowUpdates(t *testing.T) {
+	da := newDATest()
+	assert.NotNil(t, da)
+	defer da.stopAll()
+
+	log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
+	// Start the Core
+	da.startCore(false)
+	da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
+
+	a := da.createDeviceAgent(t)
+	cloned := a.getDeviceWithoutLock()
+	err := a.updateDeviceStateInStoreWithoutLock(context.Background(), cloned, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
+	assert.Nil(t, err)
+	da.testFlowAddDeletes(t, a)
+}
+
+func TestGroupUpdates(t *testing.T) {
+	da := newDATest()
+	assert.NotNil(t, da)
+	defer da.stopAll()
+
+	// Start the Core
+	da.startCore(false)
+	da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
+	a := da.createDeviceAgent(t)
+	cloned := a.getDeviceWithoutLock()
+	err := a.updateDeviceStateInStoreWithoutLock(context.Background(), cloned, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
+	assert.Nil(t, err)
+	da.testGroupAddDeletes(t, a)
+}
 
 func isFlowSliceEqual(a, b []*ofp.OfpFlowStats) bool {
 	if len(a) != len(b) {
@@ -329,196 +335,214 @@
 	}
 	return true
 }
-
-func TestFlowsToUpdateToDelete_EmptySlices(t *testing.T) {
-	newFlows := []*ofp.OfpFlowStats{}
-	existingFlows := []*ofp.OfpFlowStats{}
-	expectedNewFlows := []*ofp.OfpFlowStats{}
-	expectedFlowsToDelete := []*ofp.OfpFlowStats{}
-	expectedUpdatedAllFlows := []*ofp.OfpFlowStats{}
-	uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
-	assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
-	assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
-	assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
+func changeToFlowList(flowList map[uint64]*ofp.OfpFlowStats) []*ofp.OfpFlowStats {
+	flows := make([]*ofp.OfpFlowStats, 0)
+	for _, flow := range flowList {
+		flows = append(flows, flow)
+	}
+	return flows
 }
-
-func TestFlowsToUpdateToDelete_NoExistingFlows(t *testing.T) {
+func changeToGroupList(groupList map[uint32]*ofp.OfpGroupEntry) []*ofp.OfpGroupEntry {
+	groups := make([]*ofp.OfpGroupEntry, 0)
+	for _, group := range groupList {
+		groups = append(groups, group)
+	}
+	return groups
+}
+func (dat *DATest) testFlowAddDeletes(t *testing.T, da *Agent) {
+	//Add new Flows on empty list
 	newFlows := []*ofp.OfpFlowStats{
 		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
 		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
 		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
 	}
-	existingFlows := []*ofp.OfpFlowStats{}
-	expectedNewFlows := []*ofp.OfpFlowStats{
-		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
-		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
-		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
-	}
-	expectedFlowsToDelete := []*ofp.OfpFlowStats{}
-	expectedUpdatedAllFlows := []*ofp.OfpFlowStats{
-		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
-		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
-		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
-	}
-	uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
-	assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
-	assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
-	assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
-}
+	err := da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daFlows := changeToFlowList(da.listDeviceFlows())
+	assert.True(t, isFlowSliceEqual(newFlows, daFlows))
 
-func TestFlowsToUpdateToDelete_UpdateNoDelete(t *testing.T) {
-	newFlows := []*ofp.OfpFlowStats{
-		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
-		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
-		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
-	}
-	existingFlows := []*ofp.OfpFlowStats{
-		{Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
-		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
-		{Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
-	}
-	expectedNewFlows := []*ofp.OfpFlowStats{
-		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
-		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
-	}
-	expectedFlowsToDelete := []*ofp.OfpFlowStats{}
-	expectedUpdatedAllFlows := []*ofp.OfpFlowStats{
-		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
-		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
-		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
-		{Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
-		{Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
-	}
-	uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
-	assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
-	assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
-	assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
-}
-
-func TestFlowsToUpdateToDelete_UpdateAndDelete(t *testing.T) {
-	newFlows := []*ofp.OfpFlowStats{
-		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 20},
-		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
-		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 10, Flags: 0, Cookie: 1250000, PacketCount: 0},
-		{Id: 126, TableId: 1260, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+	//Add new Flows on existing ones
+	newFlows = []*ofp.OfpFlowStats{
+		{Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
 		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270000, PacketCount: 0},
 	}
-	existingFlows := []*ofp.OfpFlowStats{
-		{Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
-		{Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
+
+	expectedFlows := []*ofp.OfpFlowStats{
+		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
+		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+		{Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270000, PacketCount: 0},
+	}
+
+	err = da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daFlows = changeToFlowList(da.listDeviceFlows())
+	assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
+
+	//Add existing Flows again with a new flow
+	newFlows = []*ofp.OfpFlowStats{
+		{Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270001, PacketCount: 0},
+		{Id: 128, TableId: 1280, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1280000, PacketCount: 0},
+	}
+
+	expectedFlows = []*ofp.OfpFlowStats{
+		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
+		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+		{Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270001, PacketCount: 0},
+		{Id: 128, TableId: 1280, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1280000, PacketCount: 0},
+	}
+
+	err = da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daFlows = changeToFlowList(da.listDeviceFlows())
+	assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
+
+	//Add already existing flows again
+	newFlows = []*ofp.OfpFlowStats{
+		{Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270001, PacketCount: 0},
+		{Id: 128, TableId: 1280, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1280000, PacketCount: 0},
+	}
+
+	expectedFlows = []*ofp.OfpFlowStats{
+		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
+		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+		{Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270001, PacketCount: 0},
+		{Id: 128, TableId: 1280, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1280000, PacketCount: 0},
+	}
+
+	err = da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daFlows = changeToFlowList(da.listDeviceFlows())
+	assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
+
+	//Delete flows
+	flowsToDelete := []*ofp.OfpFlowStats{
+		{Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270001, PacketCount: 0},
+		{Id: 128, TableId: 1280, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1280000, PacketCount: 0},
+	}
+
+	expectedFlows = []*ofp.OfpFlowStats{
 		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
 		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
 		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
 	}
-	expectedNewFlows := []*ofp.OfpFlowStats{
-		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 20},
-		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 10, Flags: 0, Cookie: 1250000, PacketCount: 0},
-		{Id: 126, TableId: 1260, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
-		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270000, PacketCount: 0},
-	}
-	expectedFlowsToDelete := []*ofp.OfpFlowStats{
-		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+
+	err = da.deleteFlowsAndGroups(context.Background(), flowsToDelete, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daFlows = changeToFlowList(da.listDeviceFlows())
+	assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
+	//Delete flows with an unexisting one
+	flowsToDelete = []*ofp.OfpFlowStats{
 		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+		{Id: 129, TableId: 1290, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1290000, PacketCount: 0},
 	}
-	expectedUpdatedAllFlows := []*ofp.OfpFlowStats{
-		{Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
-		{Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
-		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 20},
+
+	expectedFlows = []*ofp.OfpFlowStats{
+		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
 		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
-		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 10, Flags: 0, Cookie: 1250000, PacketCount: 0},
-		{Id: 126, TableId: 1260, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
-		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270000, PacketCount: 0},
 	}
-	uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
-	assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
-	assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
-	assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
+
+	err = da.deleteFlowsAndGroups(context.Background(), flowsToDelete, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daFlows = changeToFlowList(da.listDeviceFlows())
+	assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
 }
 
-func TestGroupsToUpdateToDelete_EmptySlices(t *testing.T) {
-	newGroups := []*ofp.OfpGroupEntry{}
-	existingGroups := []*ofp.OfpGroupEntry{}
-	expectedNewGroups := []*ofp.OfpGroupEntry{}
-	expectedGroupsToDelete := []*ofp.OfpGroupEntry{}
-	expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{}
-	uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
-	assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
-	assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
-	assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
-}
-
-func TestGroupsToUpdateToDelete_NoExistingGroups(t *testing.T) {
+func (dat *DATest) testGroupAddDeletes(t *testing.T, da *Agent) {
+	//Add new Groups on empty list
 	newGroups := []*ofp.OfpGroupEntry{
 		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
 		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
 	}
-	existingGroups := []*ofp.OfpGroupEntry{}
-	expectedNewGroups := []*ofp.OfpGroupEntry{
-		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
-		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
-	}
-	expectedGroupsToDelete := []*ofp.OfpGroupEntry{}
-	expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{
-		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
-		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
-	}
-	uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
-	assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
-	assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
-	assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
-}
+	err := da.addFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, newGroups, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daGroups := changeToGroupList(da.listDeviceGroups())
+	assert.True(t, isGroupSliceEqual(newGroups, daGroups))
 
-func TestGroupsToUpdateToDelete_UpdateNoDelete(t *testing.T) {
-	newGroups := []*ofp.OfpGroupEntry{
-		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
-		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
-	}
-	existingGroups := []*ofp.OfpGroupEntry{
-		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+	//Add new Groups on existing ones
+	newGroups = []*ofp.OfpGroupEntry{
 		{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
 		{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
 	}
-	expectedNewGroups := []*ofp.OfpGroupEntry{
-		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
-	}
-	expectedGroupsToDelete := []*ofp.OfpGroupEntry{}
-	expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{
+	expectedGroups := []*ofp.OfpGroupEntry{
 		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
 		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
 		{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
 		{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
 	}
-	uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
-	assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
-	assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
-	assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
-}
+	err = da.addFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, newGroups, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daGroups = changeToGroupList(da.listDeviceGroups())
+	assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
 
-func TestGroupsToUpdateToDelete_UpdateWithDelete(t *testing.T) {
-	newGroups := []*ofp.OfpGroupEntry{
-		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
-		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: []*ofp.OfpBucket{{WatchPort: 10}}}},
+	//Add new Groups on existing ones
+	newGroups = []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 44, GroupId: 40, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
 	}
-	existingGroups := []*ofp.OfpGroupEntry{
+	expectedGroups = []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
 		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
 		{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
-		{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 44, GroupId: 40, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
 	}
-	expectedNewGroups := []*ofp.OfpGroupEntry{
+	err = da.addFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, newGroups, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daGroups = changeToGroupList(da.listDeviceGroups())
+	assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
+
+	//Modify Group
+	updtGroups := []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 33, GroupId: 30, Buckets: nil}},
+	}
+	expectedGroups = []*ofp.OfpGroupEntry{
 		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
-		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: []*ofp.OfpBucket{{WatchPort: 10}}}},
-	}
-	expectedGroupsToDelete := []*ofp.OfpGroupEntry{
 		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 33, GroupId: 30, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 44, GroupId: 40, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
 	}
-	expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{
+	err = da.updateFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, updtGroups, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daGroups = changeToGroupList(da.listDeviceGroups())
+	assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
+
+	//Delete Group
+	delGroups := []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 33, GroupId: 30, Buckets: nil}},
+	}
+	expectedGroups = []*ofp.OfpGroupEntry{
 		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
-		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: []*ofp.OfpBucket{{WatchPort: 10}}}},
-		{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 44, GroupId: 40, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
+	}
+	err = da.deleteFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, delGroups, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daGroups = changeToGroupList(da.listDeviceGroups())
+	assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
+
+	//Delete Group
+	delGroups = []*ofp.OfpGroupEntry{
 		{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
 	}
-	uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
-	assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
-	assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
-	assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
+	expectedGroups = []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
+	}
+	err = da.deleteFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, delGroups, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daGroups = changeToGroupList(da.listDeviceGroups())
+	assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
 }
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index eb65673..77c4b7c 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -26,6 +26,7 @@
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/rw_core/config"
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
+	tst "github.com/opencord/voltha-go/rw_core/test"
 	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
 	"github.com/opencord/voltha-lib-go/v3/pkg/db"
 	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
@@ -381,7 +382,7 @@
 	test := &LDATest{}
 	// Start the embedded etcd server
 	var err error
-	test.etcdServer, test.kvClientPort, err = startEmbeddedEtcdServer("voltha.rwcore.lda.test", "voltha.rwcore.lda.etcd", "error")
+	test.etcdServer, test.kvClientPort, err = tst.StartEmbeddedEtcdServer("voltha.rwcore.lda.test", "voltha.rwcore.lda.etcd", "error")
 	if err != nil {
 		logger.Fatal(err)
 	}
@@ -463,7 +464,7 @@
 	}
 	cfg.GrpcPort = grpcPort
 	cfg.GrpcHost = "127.0.0.1"
-	client := setupKVClient(cfg, lda.coreInstanceID)
+	client := tst.SetupKVClient(cfg, lda.coreInstanceID)
 	backend := &db.Backend{
 		Client:                  client,
 		StoreType:               cfg.KVStoreType,
@@ -498,7 +499,7 @@
 		lda.kmp.Stop()
 	}
 	if lda.etcdServer != nil {
-		stopEmbeddedEtcdServer(lda.etcdServer)
+		tst.StopEmbeddedEtcdServer(lda.etcdServer)
 	}
 }
 
@@ -508,7 +509,7 @@
 	clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
 	clonedLD.Id = com.GetRandomString(10)
 	clonedLD.DatapathId = rand.Uint64()
-	lDeviceAgent := newLogicalAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbProxy, lDeviceMgr.ldProxy, lDeviceMgr.defaultTimeout)
+	lDeviceAgent := newLogicalAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbPath, lDeviceMgr.ldProxy, lDeviceMgr.defaultTimeout)
 	lDeviceAgent.logicalDevice = clonedLD
 	err := lDeviceAgent.ldProxy.Set(context.Background(), clonedLD.Id, clonedLD)
 	assert.Nil(t, err)
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index f9bff21..d6ca4ce 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -42,7 +42,7 @@
 	logicalDeviceAgents            sync.Map
 	deviceMgr                      *Manager
 	kafkaICProxy                   kafka.InterContainerProxy
-	dbProxy                        *model.Path
+	dbPath                         *model.Path
 	ldProxy                        *model.Proxy
 	defaultTimeout                 time.Duration
 	logicalDevicesLoadingLock      sync.RWMutex
@@ -126,7 +126,7 @@
 
 	logger.Debugw("logical-device-id", log.Fields{"logicaldeviceId": id})
 
-	agent := newLogicalAgent(id, sn, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.dbProxy, ldMgr.ldProxy, ldMgr.defaultTimeout)
+	agent := newLogicalAgent(id, sn, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.dbPath, ldMgr.ldProxy, ldMgr.defaultTimeout)
 	ldMgr.addLogicalDeviceAgentToMap(agent)
 
 	// Update the root device with the logical device Id reference
@@ -198,7 +198,7 @@
 			ldMgr.logicalDevicesLoadingLock.Unlock()
 			if _, err := ldMgr.getLogicalDeviceFromModel(ctx, lDeviceID); err == nil {
 				logger.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceID})
-				agent := newLogicalAgent(lDeviceID, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.dbProxy, ldMgr.ldProxy, ldMgr.defaultTimeout)
+				agent := newLogicalAgent(lDeviceID, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.dbPath, ldMgr.ldProxy, ldMgr.defaultTimeout)
 				if err := agent.start(ctx, true); err != nil {
 					return err
 				}
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 357c49a..17ac266 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -34,6 +34,7 @@
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"github.com/opencord/voltha-protos/v3/go/common"
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+	"github.com/opencord/voltha-protos/v3/go/openflow_13"
 	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"google.golang.org/grpc/codes"
@@ -50,6 +51,7 @@
 	logicalDeviceMgr        *LogicalManager
 	kafkaICProxy            kafka.InterContainerProxy
 	stateTransitions        *TransitionMap
+	dbPath                  *model.Path
 	dProxy                  *model.Proxy
 	coreInstanceID          string
 	defaultTimeout          time.Duration
@@ -57,13 +59,15 @@
 	deviceLoadingInProgress map[string][]chan int
 }
 
-func NewManagers(dbProxy *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
+//NewManagers creates the Manager and the Logical Manager.
+func NewManagers(dbPath *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
 	deviceMgr := &Manager{
 		rootDevices:             make(map[string]bool),
 		kafkaICProxy:            kmp,
 		adapterProxy:            remote.NewAdapterProxy(kmp, corePairTopic, endpointMgr),
 		coreInstanceID:          coreInstanceID,
-		dProxy:                  dbProxy.Proxy("devices"),
+		dbPath:                  dbPath,
+		dProxy:                  dbPath.Proxy("devices"),
 		adapterMgr:              adapterMgr,
 		defaultTimeout:          defaultCoreTimeout * time.Millisecond,
 		deviceLoadingInProgress: make(map[string][]chan int),
@@ -74,8 +78,8 @@
 		Manager:                        event.NewManager(),
 		deviceMgr:                      deviceMgr,
 		kafkaICProxy:                   kmp,
-		dbProxy:                        dbProxy,
-		ldProxy:                        dbProxy.Proxy("logical_devices"),
+		dbPath:                         dbPath,
+		ldProxy:                        dbPath.Proxy("logical_devices"),
 		defaultTimeout:                 defaultCoreTimeout,
 		logicalDeviceLoadingInProgress: make(map[string][]chan int),
 	}
@@ -157,7 +161,7 @@
 	// Ensure this device is set as root
 	device.Root = true
 	// Create and start a device agent for that device
-	agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
+	agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
 	device, err = agent.start(ctx, device)
 	if err != nil {
 		logger.Errorw("Fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
@@ -220,22 +224,34 @@
 // ListDeviceFlows returns the flow details for a specific device entry
 func (dMgr *Manager) ListDeviceFlows(ctx context.Context, id *voltha.ID) (*ofp.Flows, error) {
 	logger.Debugw("ListDeviceFlows", log.Fields{"device-id": id.Id})
-	device, err := dMgr.getDevice(ctx, id.Id)
-	if err != nil {
-		return &ofp.Flows{}, err
+	agent := dMgr.getDeviceAgent(ctx, id.Id)
+	if agent == nil {
+		return &ofp.Flows{}, status.Errorf(codes.NotFound, "device-%s", id.Id)
 	}
-	return device.Flows, nil
+
+	flows := agent.listDeviceFlows()
+	ctr, ret := 0, make([]*ofp.OfpFlowStats, len(flows))
+	for _, flow := range flows {
+		ret[ctr] = flow
+		ctr++
+	}
+	return &openflow_13.Flows{Items: ret}, nil
 }
 
 // ListDeviceFlowGroups returns the flow group details for a specific device entry
 func (dMgr *Manager) ListDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*voltha.FlowGroups, error) {
 	logger.Debugw("ListDeviceFlowGroups", log.Fields{"device-id": id.Id})
-
-	device, err := dMgr.getDevice(ctx, id.Id)
-	if err != nil {
+	agent := dMgr.getDeviceAgent(ctx, id.Id)
+	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "device-%s", id.Id)
 	}
-	return device.GetFlowGroups(), nil
+	groups := agent.listDeviceGroups()
+	ctr, ret := 0, make([]*openflow_13.OfpGroupEntry, len(groups))
+	for _, group := range groups {
+		ret[ctr] = group
+		ctr++
+	}
+	return &voltha.FlowGroups{Items: ret}, nil
 }
 
 // stopManagingDevice stops the management of the device as well as any of its reference device and logical device.
@@ -408,7 +424,7 @@
 		// If device is not in memory then set it up
 		if !dMgr.IsDeviceInCache(device.Id) {
 			logger.Debugw("loading-device-from-Model", log.Fields{"id": device.Id})
-			agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
+			agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
 			if _, err := agent.start(ctx, nil); err != nil {
 				logger.Warnw("failure-starting-agent", log.Fields{"deviceId": device.Id})
 			} else {
@@ -471,7 +487,7 @@
 			// Proceed with the loading only if the device exist in the Model (could have been deleted)
 			if device, err = dMgr.getDeviceFromModel(ctx, deviceID); err == nil {
 				logger.Debugw("loading-device", log.Fields{"deviceId": deviceID})
-				agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
+				agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
 				if _, err = agent.start(ctx, nil); err != nil {
 					logger.Warnw("Failure loading device", log.Fields{"deviceId": deviceID, "error": err})
 				} else {
@@ -1029,7 +1045,7 @@
 	childDevice.ProxyAddress = &voltha.Device_ProxyAddress{DeviceId: parentDeviceID, DeviceType: pAgent.deviceType, ChannelId: uint32(channelID), OnuId: uint32(onuID)}
 
 	// Create and start a device agent for that device
-	agent := newAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
+	agent := newAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
 	childDevice, err := agent.start(ctx, childDevice)
 	if err != nil {
 		logger.Errorw("error-starting-child-device", log.Fields{"parent-device-id": childDevice.ParentId, "child-device-id": agent.deviceID, "error": err})
diff --git a/rw_core/test/common.go b/rw_core/test/common.go
new file mode 100644
index 0000000..a20f029
--- /dev/null
+++ b/rw_core/test/common.go
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package core Common Logger initialization
+package test
+
+import (
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.Logger
+
+func init() {
+	// Setup this package so that it's log level can be modified at run time
+	var err error
+	logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "test"})
+	if err != nil {
+		panic(err)
+	}
+}
diff --git a/rw_core/test/utils.go b/rw_core/test/utils.go
new file mode 100644
index 0000000..7d2dda0
--- /dev/null
+++ b/rw_core/test/utils.go
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package core Common Logger initialization
+package test
+
+import (
+	"strconv"
+	"testing"
+
+	"github.com/opencord/voltha-go/rw_core/config"
+	"github.com/opencord/voltha-go/rw_core/core/adapter"
+	cm "github.com/opencord/voltha-go/rw_core/mocks"
+	"github.com/opencord/voltha-lib-go/v3/pkg/adapters"
+	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
+	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
+	"github.com/opencord/voltha-lib-go/v3/pkg/version"
+	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"github.com/phayes/freeport"
+	"github.com/stretchr/testify/assert"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+const (
+	OltAdapter = iota
+	OnuAdapter
+)
+
+//CreateMockAdapter creates mock OLT and ONU adapters
+func CreateMockAdapter(adapterType int, kafkaClient kafka.Client, coreInstanceID string, coreName string, adapterName string) (adapters.IAdapter, error) {
+	var err error
+	var adapter adapters.IAdapter
+	adapterKafkaICProxy := kafka.NewInterContainerProxy(
+		kafka.MsgClient(kafkaClient),
+		kafka.DefaultTopic(&kafka.Topic{Name: adapterName}))
+	adapterCoreProxy := com.NewCoreProxy(adapterKafkaICProxy, adapterName, coreName)
+	var adapterReqHandler *com.RequestHandlerProxy
+	switch adapterType {
+	case OltAdapter:
+		adapter = cm.NewOLTAdapter(adapterCoreProxy)
+	case OnuAdapter:
+		adapter = cm.NewONUAdapter(adapterCoreProxy)
+	default:
+		logger.Fatalf("invalid-adapter-type-%d", adapterType)
+	}
+	adapterReqHandler = com.NewRequestHandlerProxy(coreInstanceID, adapter, adapterCoreProxy)
+
+	if err = adapterKafkaICProxy.Start(); err != nil {
+		logger.Errorw("Failure-starting-adapter-intercontainerProxy", log.Fields{"error": err})
+		return nil, err
+	}
+	if err = adapterKafkaICProxy.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: adapterName}, adapterReqHandler); err != nil {
+		logger.Errorw("Failure-to-subscribe-onu-request-handler", log.Fields{"error": err})
+		return nil, err
+	}
+	return adapter, nil
+}
+
+//CreateAndregisterAdapters creates mock ONU and OLT adapters and egisters them to rw-core
+func CreateAndregisterAdapters(t *testing.T, kClient kafka.Client, coreInstanceID string, oltAdapterName string, onuAdapterName string, adapterMgr *adapter.Manager) (*cm.OLTAdapter, *cm.ONUAdapter) {
+	// Setup the mock OLT adapter
+	oltAdapter, err := CreateMockAdapter(OltAdapter, kClient, coreInstanceID, "rw_core", oltAdapterName)
+	assert.Nil(t, err)
+	assert.NotNil(t, oltAdapter)
+
+	//	Register the adapter
+	registrationData := &voltha.Adapter{
+		Id:             oltAdapterName,
+		Vendor:         "Voltha-olt",
+		Version:        version.VersionInfo.Version,
+		Type:           oltAdapterName,
+		CurrentReplica: 1,
+		TotalReplicas:  1,
+		Endpoint:       oltAdapterName,
+	}
+	types := []*voltha.DeviceType{{Id: oltAdapterName, Adapter: oltAdapterName, AcceptsAddRemoveFlowUpdates: true}}
+	deviceTypes := &voltha.DeviceTypes{Items: types}
+	if _, err := adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
+		logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
+		assert.NotNil(t, err)
+	}
+
+	// Setup the mock ONU adapter
+	onuAdapter, err := CreateMockAdapter(OnuAdapter, kClient, coreInstanceID, "rw_core", onuAdapterName)
+
+	assert.Nil(t, err)
+	assert.NotNil(t, onuAdapter)
+	//	Register the adapter
+	registrationData = &voltha.Adapter{
+		Id:             onuAdapterName,
+		Vendor:         "Voltha-onu",
+		Version:        version.VersionInfo.Version,
+		Type:           onuAdapterName,
+		CurrentReplica: 1,
+		TotalReplicas:  1,
+		Endpoint:       onuAdapterName,
+	}
+	types = []*voltha.DeviceType{{Id: onuAdapterName, Adapter: onuAdapterName, AcceptsAddRemoveFlowUpdates: true}}
+	deviceTypes = &voltha.DeviceTypes{Items: types}
+	if _, err := adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
+		logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
+		assert.NotNil(t, err)
+	}
+	return oltAdapter.(*cm.OLTAdapter), onuAdapter.(*cm.ONUAdapter)
+}
+
+//StartEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
+func StartEmbeddedEtcdServer(configName, storageDir, logLevel string) (*mock_etcd.EtcdServer, int, error) {
+	kvClientPort, err := freeport.GetFreePort()
+	if err != nil {
+		return nil, 0, err
+	}
+	peerPort, err := freeport.GetFreePort()
+	if err != nil {
+		return nil, 0, err
+	}
+	etcdServer := mock_etcd.StartEtcdServer(mock_etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
+	if etcdServer == nil {
+		return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
+	}
+	return etcdServer, kvClientPort, nil
+}
+
+//StopEmbeddedEtcdServer stops the embedded etcd server
+func StopEmbeddedEtcdServer(server *mock_etcd.EtcdServer) {
+	if server != nil {
+		server.Stop()
+	}
+}
+
+//SetupKVClient creates a new etcd client
+func SetupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
+	addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
+	client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout, log.FatalLevel)
+	if err != nil {
+		panic("no kv client")
+	}
+	return client
+}