VOL-2909 - Disaggregating rw_core/core/.

This breaks the core package into logical components. (adapter manager, adapter proxy, devices, nbi/api), as well as the "core" which aggregates all these.

Change-Id: I257ac64024a1cf3efe3f5d89d508e60e6e681fb1
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
new file mode 100755
index 0000000..ce69599
--- /dev/null
+++ b/rw_core/core/device/agent_test.go
@@ -0,0 +1,538 @@
+/*
+* Copyright 2019-present Open Networking Foundation
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+package device
+
+import (
+	"context"
+	"github.com/gogo/protobuf/proto"
+	"github.com/opencord/voltha-go/db/model"
+	"github.com/opencord/voltha-go/rw_core/config"
+	"github.com/opencord/voltha-go/rw_core/core/adapter"
+	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
+	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+	mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
+	mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
+	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"github.com/phayes/freeport"
+	"github.com/stretchr/testify/assert"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"math/rand"
+	"sort"
+	"strconv"
+	"strings"
+	"sync"
+	"testing"
+	"time"
+)
+
+type DATest struct {
+	etcdServer       *mock_etcd.EtcdServer
+	deviceMgr        *Manager
+	logicalDeviceMgr *LogicalManager
+	kmp              kafka.InterContainerProxy
+	kClient          kafka.Client
+	kvClientPort     int
+	oltAdapterName   string
+	onuAdapterName   string
+	coreInstanceID   string
+	defaultTimeout   time.Duration
+	maxTimeout       time.Duration
+	device           *voltha.Device
+	done             chan int
+}
+
+func newDATest() *DATest {
+	test := &DATest{}
+	// Start the embedded etcd server
+	var err error
+	test.etcdServer, test.kvClientPort, err = startEmbeddedEtcdServer("voltha.rwcore.da.test", "voltha.rwcore.da.etcd", "error")
+	if err != nil {
+		logger.Fatal(err)
+	}
+	// Create the kafka client
+	test.kClient = mock_kafka.NewKafkaClient()
+	test.oltAdapterName = "olt_adapter_mock"
+	test.onuAdapterName = "onu_adapter_mock"
+	test.coreInstanceID = "rw-da-test"
+	test.defaultTimeout = 5 * time.Second
+	test.maxTimeout = 20 * time.Second
+	test.done = make(chan int)
+	parentID := com.GetRandomString(10)
+	test.device = &voltha.Device{
+		Type:         "onu_adapter_mock",
+		ParentId:     parentID,
+		ParentPortNo: 1,
+		VendorId:     "onu_adapter_mock",
+		Adapter:      "onu_adapter_mock",
+		Vlan:         100,
+		Address:      nil,
+		ProxyAddress: &voltha.Device_ProxyAddress{
+			DeviceId:           parentID,
+			DeviceType:         "olt_adapter_mock",
+			ChannelId:          100,
+			ChannelGroupId:     0,
+			ChannelTermination: "",
+			OnuId:              2,
+		},
+		AdminState:    voltha.AdminState_PREPROVISIONED,
+		OperStatus:    voltha.OperStatus_UNKNOWN,
+		Reason:        "All good",
+		ConnectStatus: voltha.ConnectStatus_UNKNOWN,
+		Custom:        nil,
+		Ports: []*voltha.Port{
+			{PortNo: 1, Label: "pon-1", Type: voltha.Port_PON_ONU, AdminState: voltha.AdminState_ENABLED,
+				OperStatus: voltha.OperStatus_ACTIVE, Peers: []*voltha.Port_PeerPort{{DeviceId: parentID, PortNo: 1}}},
+			{PortNo: 100, Label: "uni-100", Type: voltha.Port_ETHERNET_UNI, AdminState: voltha.AdminState_ENABLED,
+				OperStatus: voltha.OperStatus_ACTIVE},
+		},
+	}
+	return test
+}
+
+type fakeEventCallbacks struct{}
+
+func (fakeEventCallbacks) SendChangeEvent(_ string, _ *ofp.OfpPortStatus)      {}
+func (fakeEventCallbacks) SendPacketIn(_ string, _ string, _ *ofp.OfpPacketIn) {}
+
+func (dat *DATest) startCore(inCompeteMode bool) {
+	cfg := config.NewRWCoreFlags()
+	cfg.CorePairTopic = "rw_core"
+	cfg.DefaultRequestTimeout = dat.defaultTimeout
+	cfg.KVStorePort = dat.kvClientPort
+	cfg.InCompetingMode = inCompeteMode
+	grpcPort, err := freeport.GetFreePort()
+	if err != nil {
+		logger.Fatal("Cannot get a freeport for grpc")
+	}
+	cfg.GrpcPort = grpcPort
+	cfg.GrpcHost = "127.0.0.1"
+	client := setupKVClient(cfg, dat.coreInstanceID)
+	backend := &db.Backend{
+		Client:                  client,
+		StoreType:               cfg.KVStoreType,
+		Host:                    cfg.KVStoreHost,
+		Port:                    cfg.KVStorePort,
+		Timeout:                 cfg.KVStoreTimeout,
+		LivenessChannelInterval: cfg.LiveProbeInterval / 2,
+		PathPrefix:              cfg.KVStoreDataPrefix}
+	dat.kmp = kafka.NewInterContainerProxy(
+		kafka.InterContainerHost(cfg.KafkaAdapterHost),
+		kafka.InterContainerPort(cfg.KafkaAdapterPort),
+		kafka.MsgClient(dat.kClient),
+		kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
+		kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
+
+	endpointMgr := kafka.NewEndpointManager(backend)
+	proxy := model.NewProxy(backend, "/")
+	adapterMgr := adapter.NewAdapterManager(proxy, dat.coreInstanceID, dat.kClient)
+
+	dat.deviceMgr, dat.logicalDeviceMgr = NewDeviceManagers(proxy, adapterMgr, dat.kmp, endpointMgr, cfg.CorePairTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
+	dat.logicalDeviceMgr.SetEventCallbacks(fakeEventCallbacks{})
+	if err = dat.kmp.Start(); err != nil {
+		logger.Fatal("Cannot start InterContainerProxy")
+	}
+	if err = adapterMgr.Start(context.Background()); err != nil {
+		logger.Fatal("Cannot start adapterMgr")
+	}
+	dat.deviceMgr.Start(context.Background())
+	dat.logicalDeviceMgr.Start(context.Background())
+}
+
+func (dat *DATest) stopAll() {
+	if dat.kClient != nil {
+		dat.kClient.Stop()
+	}
+	if dat.logicalDeviceMgr != nil {
+		dat.logicalDeviceMgr.Stop(context.Background())
+	}
+	if dat.deviceMgr != nil {
+		dat.deviceMgr.Stop(context.Background())
+	}
+	if dat.kmp != nil {
+		dat.kmp.Stop()
+	}
+	if dat.etcdServer != nil {
+		stopEmbeddedEtcdServer(dat.etcdServer)
+	}
+}
+
+//startEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
+func startEmbeddedEtcdServer(configName, storageDir, logLevel string) (*mock_etcd.EtcdServer, int, error) {
+	kvClientPort, err := freeport.GetFreePort()
+	if err != nil {
+		return nil, 0, err
+	}
+	peerPort, err := freeport.GetFreePort()
+	if err != nil {
+		return nil, 0, err
+	}
+	etcdServer := mock_etcd.StartEtcdServer(mock_etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
+	if etcdServer == nil {
+		return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
+	}
+	return etcdServer, kvClientPort, nil
+}
+
+func stopEmbeddedEtcdServer(server *mock_etcd.EtcdServer) {
+	if server != nil {
+		server.Stop()
+	}
+}
+
+func setupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
+	addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
+	client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout)
+	if err != nil {
+		panic("no kv client")
+	}
+	return client
+}
+
+func (dat *DATest) createDeviceAgent(t *testing.T) *Agent {
+	deviceMgr := dat.deviceMgr
+	clonedDevice := proto.Clone(dat.device).(*voltha.Device)
+	deviceAgent := newAgent(deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.clusterDataProxy, deviceMgr.defaultTimeout)
+	d, err := deviceAgent.start(context.TODO(), clonedDevice)
+	assert.Nil(t, err)
+	assert.NotNil(t, d)
+	deviceMgr.addDeviceAgentToMap(deviceAgent)
+	return deviceAgent
+}
+
+func (dat *DATest) updateDeviceConcurrently(t *testing.T, da *Agent, globalWG *sync.WaitGroup) {
+	originalDevice, err := da.getDevice(context.Background())
+	assert.Nil(t, err)
+	assert.NotNil(t, originalDevice)
+	var localWG sync.WaitGroup
+
+	// Update device routine
+	var (
+		root         = false
+		vendor       = "onu_adapter_mock"
+		model        = "go-mock"
+		serialNumber = com.GetRandomSerialNumber()
+		macAddress   = strings.ToUpper(com.GetRandomMacAddress())
+		vlan         = rand.Uint32()
+		reason       = "testing concurrent device update"
+		portToAdd    = &voltha.Port{PortNo: 101, Label: "uni-101", Type: voltha.Port_ETHERNET_UNI, AdminState: voltha.AdminState_ENABLED,
+			OperStatus: voltha.OperStatus_ACTIVE}
+	)
+	localWG.Add(1)
+	go func() {
+		deviceToUpdate := proto.Clone(originalDevice).(*voltha.Device)
+		deviceToUpdate.Root = root
+		deviceToUpdate.Vendor = vendor
+		deviceToUpdate.Model = model
+		deviceToUpdate.SerialNumber = serialNumber
+		deviceToUpdate.MacAddress = macAddress
+		deviceToUpdate.Vlan = vlan
+		deviceToUpdate.Reason = reason
+		err := da.updateDeviceUsingAdapterData(context.Background(), deviceToUpdate)
+		assert.Nil(t, err)
+		localWG.Done()
+	}()
+
+	// Update the device status routine
+	localWG.Add(1)
+	go func() {
+		err := da.updateDeviceStatus(context.Background(), voltha.OperStatus_ACTIVE, voltha.ConnectStatus_REACHABLE)
+		assert.Nil(t, err)
+		localWG.Done()
+	}()
+
+	// Add a port routine
+	localWG.Add(1)
+	go func() {
+		err := da.addPort(context.Background(), portToAdd)
+		assert.Nil(t, err)
+		localWG.Done()
+	}()
+
+	// wait for go routines to be done
+	localWG.Wait()
+
+	expectedChange := proto.Clone(originalDevice).(*voltha.Device)
+	expectedChange.OperStatus = voltha.OperStatus_ACTIVE
+	expectedChange.ConnectStatus = voltha.ConnectStatus_REACHABLE
+	expectedChange.Ports = append(expectedChange.Ports, portToAdd)
+	expectedChange.Root = root
+	expectedChange.Vendor = vendor
+	expectedChange.Model = model
+	expectedChange.SerialNumber = serialNumber
+	expectedChange.MacAddress = macAddress
+	expectedChange.Vlan = vlan
+	expectedChange.Reason = reason
+
+	updatedDevice, _ := da.getDevice(context.Background())
+	assert.NotNil(t, updatedDevice)
+	assert.True(t, proto.Equal(expectedChange, updatedDevice))
+
+	globalWG.Done()
+}
+
+func TestConcurrentDevices(t *testing.T) {
+	for i := 0; i < 2; i++ {
+		da := newDATest()
+		assert.NotNil(t, da)
+		defer da.stopAll()
+
+		// Start the Core
+		da.startCore(false)
+
+		var wg sync.WaitGroup
+		numConCurrentDeviceAgents := 20
+		for i := 0; i < numConCurrentDeviceAgents; i++ {
+			wg.Add(1)
+			a := da.createDeviceAgent(t)
+			go da.updateDeviceConcurrently(t, a, &wg)
+		}
+
+		wg.Wait()
+	}
+}
+
+func isFlowSliceEqual(a, b []*ofp.OfpFlowStats) bool {
+	if len(a) != len(b) {
+		return false
+	}
+	sort.Slice(a, func(i, j int) bool {
+		return a[i].Id < a[j].Id
+	})
+	sort.Slice(b, func(i, j int) bool {
+		return b[i].Id < b[j].Id
+	})
+	for idx := range a {
+		if !proto.Equal(a[idx], b[idx]) {
+			return false
+		}
+	}
+	return true
+}
+
+func isGroupSliceEqual(a, b []*ofp.OfpGroupEntry) bool {
+	if len(a) != len(b) {
+		return false
+	}
+	sort.Slice(a, func(i, j int) bool {
+		return a[i].Desc.GroupId < a[j].Desc.GroupId
+	})
+	sort.Slice(b, func(i, j int) bool {
+		return b[i].Desc.GroupId < b[j].Desc.GroupId
+	})
+	for idx := range a {
+		if !proto.Equal(a[idx], b[idx]) {
+			return false
+		}
+	}
+	return true
+}
+
+func TestFlowsToUpdateToDelete_EmptySlices(t *testing.T) {
+	newFlows := []*ofp.OfpFlowStats{}
+	existingFlows := []*ofp.OfpFlowStats{}
+	expectedNewFlows := []*ofp.OfpFlowStats{}
+	expectedFlowsToDelete := []*ofp.OfpFlowStats{}
+	expectedUpdatedAllFlows := []*ofp.OfpFlowStats{}
+	uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
+	assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
+	assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
+	assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
+}
+
+func TestFlowsToUpdateToDelete_NoExistingFlows(t *testing.T) {
+	newFlows := []*ofp.OfpFlowStats{
+		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
+		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+	}
+	existingFlows := []*ofp.OfpFlowStats{}
+	expectedNewFlows := []*ofp.OfpFlowStats{
+		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
+		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+	}
+	expectedFlowsToDelete := []*ofp.OfpFlowStats{}
+	expectedUpdatedAllFlows := []*ofp.OfpFlowStats{
+		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
+		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+	}
+	uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
+	assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
+	assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
+	assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
+}
+
+func TestFlowsToUpdateToDelete_UpdateNoDelete(t *testing.T) {
+	newFlows := []*ofp.OfpFlowStats{
+		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
+		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+	}
+	existingFlows := []*ofp.OfpFlowStats{
+		{Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
+		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
+		{Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
+	}
+	expectedNewFlows := []*ofp.OfpFlowStats{
+		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+	}
+	expectedFlowsToDelete := []*ofp.OfpFlowStats{}
+	expectedUpdatedAllFlows := []*ofp.OfpFlowStats{
+		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
+		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+		{Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
+		{Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
+	}
+	uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
+	assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
+	assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
+	assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
+}
+
+func TestFlowsToUpdateToDelete_UpdateAndDelete(t *testing.T) {
+	newFlows := []*ofp.OfpFlowStats{
+		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 20},
+		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
+		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 10, Flags: 0, Cookie: 1250000, PacketCount: 0},
+		{Id: 126, TableId: 1260, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270000, PacketCount: 0},
+	}
+	existingFlows := []*ofp.OfpFlowStats{
+		{Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
+		{Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
+		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
+		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+	}
+	expectedNewFlows := []*ofp.OfpFlowStats{
+		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 20},
+		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 10, Flags: 0, Cookie: 1250000, PacketCount: 0},
+		{Id: 126, TableId: 1260, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270000, PacketCount: 0},
+	}
+	expectedFlowsToDelete := []*ofp.OfpFlowStats{
+		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+	}
+	expectedUpdatedAllFlows := []*ofp.OfpFlowStats{
+		{Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
+		{Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
+		{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 20},
+		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
+		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 10, Flags: 0, Cookie: 1250000, PacketCount: 0},
+		{Id: 126, TableId: 1260, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270000, PacketCount: 0},
+	}
+	uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
+	assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
+	assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
+	assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
+}
+
+func TestGroupsToUpdateToDelete_EmptySlices(t *testing.T) {
+	newGroups := []*ofp.OfpGroupEntry{}
+	existingGroups := []*ofp.OfpGroupEntry{}
+	expectedNewGroups := []*ofp.OfpGroupEntry{}
+	expectedGroupsToDelete := []*ofp.OfpGroupEntry{}
+	expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{}
+	uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
+	assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
+	assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
+	assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
+}
+
+func TestGroupsToUpdateToDelete_NoExistingGroups(t *testing.T) {
+	newGroups := []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+	}
+	existingGroups := []*ofp.OfpGroupEntry{}
+	expectedNewGroups := []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+	}
+	expectedGroupsToDelete := []*ofp.OfpGroupEntry{}
+	expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+	}
+	uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
+	assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
+	assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
+	assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
+}
+
+func TestGroupsToUpdateToDelete_UpdateNoDelete(t *testing.T) {
+	newGroups := []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+	}
+	existingGroups := []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
+	}
+	expectedNewGroups := []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
+	}
+	expectedGroupsToDelete := []*ofp.OfpGroupEntry{}
+	expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
+	}
+	uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
+	assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
+	assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
+	assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
+}
+
+func TestGroupsToUpdateToDelete_UpdateWithDelete(t *testing.T) {
+	newGroups := []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: []*ofp.OfpBucket{{WatchPort: 10}}}},
+	}
+	existingGroups := []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
+	}
+	expectedNewGroups := []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: []*ofp.OfpBucket{{WatchPort: 10}}}},
+	}
+	expectedGroupsToDelete := []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+	}
+	expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{
+		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: []*ofp.OfpBucket{{WatchPort: 10}}}},
+		{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
+		{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
+	}
+	uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
+	assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
+	assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
+	assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
+}