[VOL-3005] Separate Flows from Device

Also some unit test functions moved to a test util class.
New loaders and Proxy implementation are applied.

Change-Id: Icf5a6f0a42a2dbaeff768fdb108f5e9b46644977
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index ffc2a3b..29d8062 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -18,13 +18,21 @@
 
 import (
 	"context"
+	"math/rand"
+	"sort"
+	"strings"
+	"sync"
+	"testing"
+	"time"
+
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/rw_core/config"
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
+	cm "github.com/opencord/voltha-go/rw_core/mocks"
+	tst "github.com/opencord/voltha-go/rw_core/test"
 	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
 	"github.com/opencord/voltha-lib-go/v3/pkg/db"
-	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
@@ -33,24 +41,18 @@
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"github.com/phayes/freeport"
 	"github.com/stretchr/testify/assert"
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
-	"math/rand"
-	"sort"
-	"strconv"
-	"strings"
-	"sync"
-	"testing"
-	"time"
 )
 
 type DATest struct {
 	etcdServer       *mock_etcd.EtcdServer
 	deviceMgr        *Manager
 	logicalDeviceMgr *LogicalManager
+	adapterMgr       *adapter.Manager
 	kmp              kafka.InterContainerProxy
 	kClient          kafka.Client
 	kvClientPort     int
+	oltAdapter       *cm.OLTAdapter
+	onuAdapter       *cm.ONUAdapter
 	oltAdapterName   string
 	onuAdapterName   string
 	coreInstanceID   string
@@ -64,7 +66,7 @@
 	test := &DATest{}
 	// Start the embedded etcd server
 	var err error
-	test.etcdServer, test.kvClientPort, err = startEmbeddedEtcdServer("voltha.rwcore.da.test", "voltha.rwcore.da.etcd", "error")
+	test.etcdServer, test.kvClientPort, err = tst.StartEmbeddedEtcdServer("voltha.rwcore.da.test", "voltha.rwcore.da.etcd", "error")
 	if err != nil {
 		logger.Fatal(err)
 	}
@@ -120,7 +122,7 @@
 	}
 	cfg.GrpcPort = grpcPort
 	cfg.GrpcHost = "127.0.0.1"
-	client := setupKVClient(cfg, dat.coreInstanceID)
+	client := tst.SetupKVClient(cfg, dat.coreInstanceID)
 	backend := &db.Backend{
 		Client:                  client,
 		StoreType:               cfg.KVStoreType,
@@ -138,13 +140,18 @@
 
 	endpointMgr := kafka.NewEndpointManager(backend)
 	proxy := model.NewDBPath(backend)
-	adapterMgr := adapter.NewAdapterManager(proxy, dat.coreInstanceID, dat.kClient)
+	dat.adapterMgr = adapter.NewAdapterManager(proxy, dat.coreInstanceID, dat.kClient)
 
-	dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, adapterMgr, dat.kmp, endpointMgr, cfg.CorePairTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
+	dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CorePairTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
+	dat.adapterMgr.Start(context.Background())
 	if err = dat.kmp.Start(); err != nil {
 		logger.Fatal("Cannot start InterContainerProxy")
 	}
-	adapterMgr.Start(context.Background())
+
+	if err := dat.kmp.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: cfg.CorePairTopic}, kafka.OffsetNewest); err != nil {
+		logger.Fatalf("Cannot add default request handler: %s", err)
+	}
+
 }
 
 func (dat *DATest) stopAll() {
@@ -155,46 +162,14 @@
 		dat.kmp.Stop()
 	}
 	if dat.etcdServer != nil {
-		stopEmbeddedEtcdServer(dat.etcdServer)
+		tst.StopEmbeddedEtcdServer(dat.etcdServer)
 	}
 }
 
-//startEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
-func startEmbeddedEtcdServer(configName, storageDir, logLevel string) (*mock_etcd.EtcdServer, int, error) {
-	kvClientPort, err := freeport.GetFreePort()
-	if err != nil {
-		return nil, 0, err
-	}
-	peerPort, err := freeport.GetFreePort()
-	if err != nil {
-		return nil, 0, err
-	}
-	etcdServer := mock_etcd.StartEtcdServer(mock_etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
-	if etcdServer == nil {
-		return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
-	}
-	return etcdServer, kvClientPort, nil
-}
-
-func stopEmbeddedEtcdServer(server *mock_etcd.EtcdServer) {
-	if server != nil {
-		server.Stop()
-	}
-}
-
-func setupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
-	addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
-	client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout, log.FatalLevel)
-	if err != nil {
-		panic("no kv client")
-	}
-	return client
-}
-
 func (dat *DATest) createDeviceAgent(t *testing.T) *Agent {
 	deviceMgr := dat.deviceMgr
 	clonedDevice := proto.Clone(dat.device).(*voltha.Device)
-	deviceAgent := newAgent(deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.dProxy, deviceMgr.defaultTimeout)
+	deviceAgent := newAgent(deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.dbPath, deviceMgr.dProxy, deviceMgr.defaultTimeout)
 	d, err := deviceAgent.start(context.TODO(), clonedDevice)
 	assert.Nil(t, err)
 	assert.NotNil(t, d)
@@ -278,7 +253,7 @@
 		da := newDATest()
 		assert.NotNil(t, da)
 		defer da.stopAll()
-
+		log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
 		// Start the Core
 		da.startCore(false)
 
@@ -293,6 +268,37 @@
 		wg.Wait()
 	}
 }
+func TestFlowUpdates(t *testing.T) {
+	da := newDATest()
+	assert.NotNil(t, da)
+	defer da.stopAll()
+
+	log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
+	// Start the Core
+	da.startCore(false)
+	da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
+
+	a := da.createDeviceAgent(t)
+	cloned := a.getDeviceWithoutLock()
+	err := a.updateDeviceStateInStoreWithoutLock(context.Background(), cloned, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
+	assert.Nil(t, err)
+	da.testFlowAddDeletes(t, a)
+}
+
+func TestGroupUpdates(t *testing.T) {
+	da := newDATest()
+	assert.NotNil(t, da)
+	defer da.stopAll()
+
+	// Start the Core
+	da.startCore(false)
+	da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
+	a := da.createDeviceAgent(t)
+	cloned := a.getDeviceWithoutLock()
+	err := a.updateDeviceStateInStoreWithoutLock(context.Background(), cloned, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
+	assert.Nil(t, err)
+	da.testGroupAddDeletes(t, a)
+}
 
 func isFlowSliceEqual(a, b []*ofp.OfpFlowStats) bool {
 	if len(a) != len(b) {
@@ -329,196 +335,214 @@
 	}
 	return true
 }
-
-func TestFlowsToUpdateToDelete_EmptySlices(t *testing.T) {
-	newFlows := []*ofp.OfpFlowStats{}
-	existingFlows := []*ofp.OfpFlowStats{}
-	expectedNewFlows := []*ofp.OfpFlowStats{}
-	expectedFlowsToDelete := []*ofp.OfpFlowStats{}
-	expectedUpdatedAllFlows := []*ofp.OfpFlowStats{}
-	uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
-	assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
-	assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
-	assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
+func changeToFlowList(flowList map[uint64]*ofp.OfpFlowStats) []*ofp.OfpFlowStats {
+	flows := make([]*ofp.OfpFlowStats, 0)
+	for _, flow := range flowList {
+		flows = append(flows, flow)
+	}
+	return flows
 }
-
-func TestFlowsToUpdateToDelete_NoExistingFlows(t *testing.T) {
+func changeToGroupList(groupList map[uint32]*ofp.OfpGroupEntry) []*ofp.OfpGroupEntry {
+	groups := make([]*ofp.OfpGroupEntry, 0)
+	for _, group := range groupList {
+		groups = append(groups, group)
+	}
+	return groups
+}
+func (dat *DATest) testFlowAddDeletes(t *testing.T, da *Agent) {
+	//Add new Flows on empty list
 	newFlows := []*ofp.OfpFlowStats{
 		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
 		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
 		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
 	}
-	existingFlows := []*ofp.OfpFlowStats{}
-	expectedNewFlows := []*ofp.OfpFlowStats{
-		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
-		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
-		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
-	}
-	expectedFlowsToDelete := []*ofp.OfpFlowStats{}
-	expectedUpdatedAllFlows := []*ofp.OfpFlowStats{
-		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
-		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
-		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
-	}
-	uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
-	assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
-	assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
-	assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
-}
+	err := da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daFlows := changeToFlowList(da.listDeviceFlows())
+	assert.True(t, isFlowSliceEqual(newFlows, daFlows))
 
-func TestFlowsToUpdateToDelete_UpdateNoDelete(t *testing.T) {
-	newFlows := []*ofp.OfpFlowStats{
-		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
-		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
-		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
-	}
-	existingFlows := []*ofp.OfpFlowStats{
-		{Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
-		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
-		{Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
-	}
-	expectedNewFlows := []*ofp.OfpFlowStats{
-		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
-		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
-	}
-	expectedFlowsToDelete := []*ofp.OfpFlowStats{}
-	expectedUpdatedAllFlows := []*ofp.OfpFlowStats{
-		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
-		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
-		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
-		{Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
-		{Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
-	}
-	uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
-	assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
-	assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
-	assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
-}
-
-func TestFlowsToUpdateToDelete_UpdateAndDelete(t *testing.T) {
-	newFlows := []*ofp.OfpFlowStats{
-		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 20},
-		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
-		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 10, Flags: 0, Cookie: 1250000, PacketCount: 0},
-		{Id: 126, TableId: 1260, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+	//Add new Flows on existing ones
+	newFlows = []*ofp.OfpFlowStats{
+		{Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
 		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270000, PacketCount: 0},
 	}
-	existingFlows := []*ofp.OfpFlowStats{
-		{Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
-		{Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
+
+	expectedFlows := []*ofp.OfpFlowStats{
+		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
+		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+		{Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270000, PacketCount: 0},
+	}
+
+	err = da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daFlows = changeToFlowList(da.listDeviceFlows())
+	assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
+
+	//Add existing Flows again with a new flow
+	newFlows = []*ofp.OfpFlowStats{
+		{Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270001, PacketCount: 0},
+		{Id: 128, TableId: 1280, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1280000, PacketCount: 0},
+	}
+
+	expectedFlows = []*ofp.OfpFlowStats{
+		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
+		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+		{Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270001, PacketCount: 0},
+		{Id: 128, TableId: 1280, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1280000, PacketCount: 0},
+	}
+
+	err = da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daFlows = changeToFlowList(da.listDeviceFlows())
+	assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
+
+	//Add already existing flows again
+	newFlows = []*ofp.OfpFlowStats{
+		{Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270001, PacketCount: 0},
+		{Id: 128, TableId: 1280, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1280000, PacketCount: 0},
+	}
+
+	expectedFlows = []*ofp.OfpFlowStats{
+		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
+		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+		{Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270001, PacketCount: 0},
+		{Id: 128, TableId: 1280, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1280000, PacketCount: 0},
+	}
+
+	err = da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daFlows = changeToFlowList(da.listDeviceFlows())
+	assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
+
+	//Delete flows
+	flowsToDelete := []*ofp.OfpFlowStats{
+		{Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270001, PacketCount: 0},
+		{Id: 128, TableId: 1280, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1280000, PacketCount: 0},
+	}
+
+	expectedFlows = []*ofp.OfpFlowStats{
 		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
 		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
 		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
 	}
-	expectedNewFlows := []*ofp.OfpFlowStats{
-		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 20},
-		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 10, Flags: 0, Cookie: 1250000, PacketCount: 0},
-		{Id: 126, TableId: 1260, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
-		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270000, PacketCount: 0},
-	}
-	expectedFlowsToDelete := []*ofp.OfpFlowStats{
-		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+
+	err = da.deleteFlowsAndGroups(context.Background(), flowsToDelete, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daFlows = changeToFlowList(da.listDeviceFlows())
+	assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
+	//Delete flows with an unexisting one
+	flowsToDelete = []*ofp.OfpFlowStats{
 		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+		{Id: 129, TableId: 1290, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1290000, PacketCount: 0},
 	}
-	expectedUpdatedAllFlows := []*ofp.OfpFlowStats{
-		{Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
-		{Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
-		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 20},
+
+	expectedFlows = []*ofp.OfpFlowStats{
+		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
 		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
-		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 10, Flags: 0, Cookie: 1250000, PacketCount: 0},
-		{Id: 126, TableId: 1260, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
-		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270000, PacketCount: 0},
 	}
-	uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
-	assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
-	assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
-	assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
+
+	err = da.deleteFlowsAndGroups(context.Background(), flowsToDelete, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daFlows = changeToFlowList(da.listDeviceFlows())
+	assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
 }
 
-func TestGroupsToUpdateToDelete_EmptySlices(t *testing.T) {
-	newGroups := []*ofp.OfpGroupEntry{}
-	existingGroups := []*ofp.OfpGroupEntry{}
-	expectedNewGroups := []*ofp.OfpGroupEntry{}
-	expectedGroupsToDelete := []*ofp.OfpGroupEntry{}
-	expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{}
-	uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
-	assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
-	assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
-	assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
-}
-
-func TestGroupsToUpdateToDelete_NoExistingGroups(t *testing.T) {
+func (dat *DATest) testGroupAddDeletes(t *testing.T, da *Agent) {
+	//Add new Groups on empty list
 	newGroups := []*ofp.OfpGroupEntry{
 		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
 		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
 	}
-	existingGroups := []*ofp.OfpGroupEntry{}
-	expectedNewGroups := []*ofp.OfpGroupEntry{
-		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
-		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
-	}
-	expectedGroupsToDelete := []*ofp.OfpGroupEntry{}
-	expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{
-		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
-		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
-	}
-	uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
-	assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
-	assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
-	assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
-}
+	err := da.addFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, newGroups, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daGroups := changeToGroupList(da.listDeviceGroups())
+	assert.True(t, isGroupSliceEqual(newGroups, daGroups))
 
-func TestGroupsToUpdateToDelete_UpdateNoDelete(t *testing.T) {
-	newGroups := []*ofp.OfpGroupEntry{
-		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
-		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
-	}
-	existingGroups := []*ofp.OfpGroupEntry{
-		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+	//Add new Groups on existing ones
+	newGroups = []*ofp.OfpGroupEntry{
 		{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
 		{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
 	}
-	expectedNewGroups := []*ofp.OfpGroupEntry{
-		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
-	}
-	expectedGroupsToDelete := []*ofp.OfpGroupEntry{}
-	expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{
+	expectedGroups := []*ofp.OfpGroupEntry{
 		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
 		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
 		{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
 		{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
 	}
-	uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
-	assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
-	assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
-	assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
-}
+	err = da.addFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, newGroups, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daGroups = changeToGroupList(da.listDeviceGroups())
+	assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
 
-func TestGroupsToUpdateToDelete_UpdateWithDelete(t *testing.T) {
-	newGroups := []*ofp.OfpGroupEntry{
-		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
-		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: []*ofp.OfpBucket{{WatchPort: 10}}}},
+	//Add new Groups on existing ones
+	newGroups = []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 44, GroupId: 40, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
 	}
-	existingGroups := []*ofp.OfpGroupEntry{
+	expectedGroups = []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
 		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
 		{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
-		{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 44, GroupId: 40, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
 	}
-	expectedNewGroups := []*ofp.OfpGroupEntry{
+	err = da.addFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, newGroups, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daGroups = changeToGroupList(da.listDeviceGroups())
+	assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
+
+	//Modify Group
+	updtGroups := []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 33, GroupId: 30, Buckets: nil}},
+	}
+	expectedGroups = []*ofp.OfpGroupEntry{
 		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
-		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: []*ofp.OfpBucket{{WatchPort: 10}}}},
-	}
-	expectedGroupsToDelete := []*ofp.OfpGroupEntry{
 		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 33, GroupId: 30, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 44, GroupId: 40, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
 	}
-	expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{
+	err = da.updateFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, updtGroups, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daGroups = changeToGroupList(da.listDeviceGroups())
+	assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
+
+	//Delete Group
+	delGroups := []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 33, GroupId: 30, Buckets: nil}},
+	}
+	expectedGroups = []*ofp.OfpGroupEntry{
 		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
-		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: []*ofp.OfpBucket{{WatchPort: 10}}}},
-		{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 44, GroupId: 40, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
+	}
+	err = da.deleteFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, delGroups, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daGroups = changeToGroupList(da.listDeviceGroups())
+	assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
+
+	//Delete Group
+	delGroups = []*ofp.OfpGroupEntry{
 		{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
 	}
-	uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
-	assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
-	assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
-	assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
+	expectedGroups = []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
+	}
+	err = da.deleteFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, delGroups, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	daGroups = changeToGroupList(da.listDeviceGroups())
+	assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
 }