VOL-2867 - Simplified the logic for devices' flow & group updates.

Change-Id: I12b9916212dc66ecdf6d975de499dd767fac248f
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
index 479f0dc..52c7706 100644
--- a/rw_core/core/device/agent_flow.go
+++ b/rw_core/core/device/agent_flow.go
@@ -54,16 +54,6 @@
 	if err != nil {
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
-	updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		flowIDs := agent.flowLoader.ListIDs()
-		for flowID := range flowIDs {
-			if flowHandle, have := agent.flowLoader.Lock(flowID); have {
-				updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
-				flowHandle.Unlock()
-			}
-		}
-	}
 	flowsToAdd := make([]*ofp.OfpFlowStats, 0)
 	flowsToDelete := make([]*ofp.OfpFlowStats, 0)
 	for _, flow := range newFlows {
@@ -71,10 +61,8 @@
 		if err != nil {
 			return coreutils.DoneResponse(), err
 		}
-
 		if created {
 			flowsToAdd = append(flowsToAdd, flow)
-			updatedAllFlows = append(updatedAllFlows, flow)
 		} else {
 			flowToReplace := flowHandle.GetReadOnly()
 			if !proto.Equal(flowToReplace, flow) {
@@ -85,13 +73,11 @@
 				}
 				flowsToDelete = append(flowsToDelete, flowToReplace)
 				flowsToAdd = append(flowsToAdd, flow)
-				updatedAllFlows = replaceFlowInList(updatedAllFlows, flowToReplace, flow)
 			} else {
 				//No need to change the flow. It is already exist.
 				logger.Debugw(ctx, "No-need-to-change-already-existing-flow", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
 			}
 		}
-
 		flowHandle.Unlock()
 	}
 
@@ -105,7 +91,9 @@
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
 	response := coreutils.NewResponse()
 	if !dType.AcceptsAddRemoveFlowUpdates {
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &ofp.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, flowMetadata)
+
+		updatedAllFlows := agent.listDeviceFlows()
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, flowMetadata)
 		if err != nil {
 			cancel()
 			return coreutils.DoneResponse(), err
@@ -144,27 +132,13 @@
 	if err != nil {
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
-	updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		flowIDs := agent.flowLoader.ListIDs()
-		for flowID := range flowIDs {
-			if flowHandle, have := agent.flowLoader.Lock(flowID); have {
-				updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
-				flowHandle.Unlock()
-			}
-		}
-	}
 	for _, flow := range flowsToDel {
 		if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
 			// Update the store and cache
-			flowToDelete := flowHandle.GetReadOnly()
 			if err := flowHandle.Delete(ctx); err != nil {
 				flowHandle.Unlock()
 				return coreutils.DoneResponse(), err
 			}
-			if idx := fu.FindFlows(updatedAllFlows, flowToDelete); idx != -1 {
-				updatedAllFlows = deleteFlowWithoutPreservingOrder(updatedAllFlows, idx)
-			}
 			flowHandle.Unlock()
 		}
 	}
@@ -173,7 +147,9 @@
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
 	response := coreutils.NewResponse()
 	if !dType.AcceptsAddRemoveFlowUpdates {
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, flowMetadata)
+
+		updatedAllFlows := agent.listDeviceFlows()
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, flowMetadata)
 		if err != nil {
 			cancel()
 			return coreutils.DoneResponse(), err
@@ -215,19 +191,8 @@
 	if err != nil {
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
-	updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		flowIDs := agent.flowLoader.ListIDs()
-		for flowID := range flowIDs {
-			if flowHandle, have := agent.flowLoader.Lock(flowID); have {
-				updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
-				flowHandle.Unlock()
-			}
-		}
-	}
-	flowsToAdd := make([]*ofp.OfpFlowStats, 0)
-	flowsToDelete := make([]*ofp.OfpFlowStats, 0)
-
+	flowsToAdd := make([]*ofp.OfpFlowStats, 0, len(updatedFlows))
+	flowsToDelete := make([]*ofp.OfpFlowStats, 0, len(updatedFlows))
 	for _, flow := range updatedFlows {
 		if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
 			flowToDelete := flowHandle.GetReadOnly()
@@ -239,7 +204,6 @@
 
 			flowsToDelete = append(flowsToDelete, flowToDelete)
 			flowsToAdd = append(flowsToAdd, flow)
-			updatedAllFlows = replaceFlowInList(updatedAllFlows, flowToDelete, flow)
 			flowHandle.Unlock()
 		}
 	}
@@ -248,7 +212,8 @@
 	response := coreutils.NewResponse()
 	// Process bulk flow update differently than incremental update
 	if !dType.AcceptsAddRemoveFlowUpdates {
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, nil)
+		updatedAllFlows := agent.listDeviceFlows()
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, nil)
 		if err != nil {
 			cancel()
 			return coreutils.DoneResponse(), err
@@ -288,23 +253,6 @@
 	return response, nil
 }
 
-//replaceFlowInList removes the old flow from list and adds the new one.
-func replaceFlowInList(flowList []*ofp.OfpFlowStats, oldFlow *ofp.OfpFlowStats, newFlow *ofp.OfpFlowStats) []*ofp.OfpFlowStats {
-	if idx := fu.FindFlows(flowList, oldFlow); idx != -1 {
-		flowList = deleteFlowWithoutPreservingOrder(flowList, idx)
-	}
-	flowList = append(flowList, newFlow)
-	return flowList
-}
-
-//deleteFlowWithoutPreservingOrder removes a flow specified by index from the flows slice.  This function will
-//panic if the index is out of range.
-func deleteFlowWithoutPreservingOrder(flows []*ofp.OfpFlowStats, index int) []*ofp.OfpFlowStats {
-	flows[index] = flows[len(flows)-1]
-	flows[len(flows)-1] = nil
-	return flows[:len(flows)-1]
-}
-
 //filterOutFlows removes flows from a device using the uni-port as filter
 func (agent *Agent) filterOutFlows(ctx context.Context, uniPort uint32, flowMetadata *voltha.FlowMetadata) error {
 	var flowsToDelete []*ofp.OfpFlowStats
diff --git a/rw_core/core/device/agent_group.go b/rw_core/core/device/agent_group.go
index e11aee8..86cc108 100644
--- a/rw_core/core/device/agent_group.go
+++ b/rw_core/core/device/agent_group.go
@@ -22,7 +22,6 @@
 
 	"github.com/gogo/protobuf/proto"
 	coreutils "github.com/opencord/voltha-go/rw_core/utils"
-	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
@@ -56,21 +55,10 @@
 	if err != nil {
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
-	updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		groupIDs := agent.groupLoader.ListIDs()
-		for groupID := range groupIDs {
-			if grpHandle, have := agent.groupLoader.Lock(groupID); have {
-				updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
-				grpHandle.Unlock()
-			}
-		}
-	}
 
 	groupsToAdd := make([]*ofp.OfpGroupEntry, 0)
 	groupsToDelete := make([]*ofp.OfpGroupEntry, 0)
 	for _, group := range newGroups {
-
 		groupHandle, created, err := agent.groupLoader.LockOrCreate(ctx, group)
 		if err != nil {
 			return coreutils.DoneResponse(), err
@@ -78,7 +66,6 @@
 
 		if created {
 			groupsToAdd = append(groupsToAdd, group)
-			updatedAllGroups = append(updatedAllGroups, group)
 		} else {
 			groupToChange := groupHandle.GetReadOnly()
 			if !proto.Equal(groupToChange, group) {
@@ -89,7 +76,6 @@
 				}
 				groupsToDelete = append(groupsToDelete, groupToChange)
 				groupsToAdd = append(groupsToAdd, group)
-				updatedAllGroups = replaceGroupInList(updatedAllGroups, groupToChange, group)
 			} else {
 				//No need to change the group. It is already exist.
 				logger.Debugw(ctx, "No-need-to-change-already-existing-group", log.Fields{"device-id": agent.deviceID, "group": newGroups, "flow-metadata": flowMetadata})
@@ -108,7 +94,8 @@
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
 	response := coreutils.NewResponse()
 	if !dType.AcceptsAddRemoveFlowUpdates {
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
+		updatedAllGroups := agent.listDeviceGroups()
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, flowMetadata)
 		if err != nil {
 			cancel()
 			return coreutils.DoneResponse(), err
@@ -146,16 +133,6 @@
 	if err != nil {
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
-	updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		groupIDs := agent.groupLoader.ListIDs()
-		for groupID := range groupIDs {
-			if grpHandle, have := agent.groupLoader.Lock(groupID); have {
-				updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
-				grpHandle.Unlock()
-			}
-		}
-	}
 
 	for _, group := range groupsToDel {
 		if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
@@ -164,9 +141,6 @@
 				groupHandle.Unlock()
 				return coreutils.DoneResponse(), err
 			}
-			if idx := fu.FindGroup(updatedAllGroups, group.Desc.GroupId); idx != -1 {
-				updatedAllGroups = deleteGroupWithoutPreservingOrder(updatedAllGroups, idx)
-			}
 			groupHandle.Unlock()
 		}
 	}
@@ -175,7 +149,8 @@
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
 	response := coreutils.NewResponse()
 	if !dType.AcceptsAddRemoveFlowUpdates {
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
+		updatedAllGroups := agent.listDeviceGroups()
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, flowMetadata)
 		if err != nil {
 			cancel()
 			return coreutils.DoneResponse(), err
@@ -217,18 +192,8 @@
 	if err != nil {
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
-	updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		groupIDs := agent.groupLoader.ListIDs()
-		for groupID := range groupIDs {
-			if grpHandle, have := agent.groupLoader.Lock(groupID); have {
-				updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
-				grpHandle.Unlock()
-			}
-		}
-	}
-	groupsToUpdate := make([]*ofp.OfpGroupEntry, 0)
 
+	groupsToUpdate := make([]*ofp.OfpGroupEntry, 0)
 	for _, group := range updatedGroups {
 		if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
 			// Update the store and cache
@@ -237,7 +202,6 @@
 				return coreutils.DoneResponse(), err
 			}
 			groupsToUpdate = append(groupsToUpdate, group)
-			updatedAllGroups = replaceGroupInList(updatedAllGroups, groupHandle.GetReadOnly(), group)
 			groupHandle.Unlock()
 		}
 	}
@@ -246,7 +210,8 @@
 	response := coreutils.NewResponse()
 	// Process bulk flow update differently than incremental update
 	if !dType.AcceptsAddRemoveFlowUpdates {
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, nil)
+		updatedAllGroups := agent.listDeviceGroups()
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, nil)
 		if err != nil {
 			cancel()
 			return coreutils.DoneResponse(), err
@@ -285,20 +250,3 @@
 
 	return response, nil
 }
-
-//replaceGroupInList removes the old group from list and adds the new one.
-func replaceGroupInList(groupList []*ofp.OfpGroupEntry, oldGroup *ofp.OfpGroupEntry, newGroup *ofp.OfpGroupEntry) []*ofp.OfpGroupEntry {
-	if idx := fu.FindGroup(groupList, oldGroup.Desc.GroupId); idx != -1 {
-		groupList = deleteGroupWithoutPreservingOrder(groupList, idx)
-	}
-	groupList = append(groupList, newGroup)
-	return groupList
-}
-
-//deleteGroupWithoutPreservingOrder removes a group specified by index from the groups slice.  This function will
-//panic if the index is out of range.
-func deleteGroupWithoutPreservingOrder(groups []*ofp.OfpGroupEntry, index int) []*ofp.OfpGroupEntry {
-	groups[index] = groups[len(groups)-1]
-	groups[len(groups)-1] = nil
-	return groups[:len(groups)-1]
-}
diff --git a/rw_core/core/device/remote/adapter_proxy.go b/rw_core/core/device/remote/adapter_proxy.go
index f6b0e9e..6654faf 100755
--- a/rw_core/core/device/remote/adapter_proxy.go
+++ b/rw_core/core/device/remote/adapter_proxy.go
@@ -22,7 +22,7 @@
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
-	"github.com/opencord/voltha-protos/v3/go/openflow_13"
+	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 )
 
@@ -261,7 +261,7 @@
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-func (ap *AdapterProxy) PacketOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *openflow_13.OfpPacketOut) (chan *kafka.RpcResponse, error) {
+func (ap *AdapterProxy) PacketOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *ofp.OfpPacketOut) (chan *kafka.RpcResponse, error) {
 	logger.Debugw(ctx, "PacketOut", log.Fields{"device-id": deviceID, "device-type": deviceType, "out-port": outPort})
 	toTopic, err := ap.getAdapterTopic(ctx, deviceID, deviceType)
 	if err != nil {
@@ -278,17 +278,28 @@
 }
 
 // UpdateFlowsBulk invokes update flows bulk rpc
-func (ap *AdapterProxy) UpdateFlowsBulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
-	logger.Debugw(ctx, "UpdateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows.Items), "group-count": len(groups.Items), "flow-metadata": flowMetadata})
+func (ap *AdapterProxy) UpdateFlowsBulk(ctx context.Context, device *voltha.Device, flows map[uint64]*ofp.OfpFlowStats, groups map[uint32]*voltha.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
+	logger.Debugw(ctx, "UpdateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows), "group-count": len(groups), "flow-metadata": flowMetadata})
 	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
 	rpc := "update_flows_bulk"
+
+	ctr, flowSlice := 0, make([]*ofp.OfpFlowStats, len(flows))
+	for _, flow := range flows {
+		flowSlice[ctr] = flow
+		ctr++
+	}
+	ctr, groupSlice := 0, make([]*ofp.OfpGroupEntry, len(groups))
+	for _, group := range groups {
+		groupSlice[ctr] = group
+		ctr++
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
-		{Key: "flows", Value: flows},
-		{Key: "groups", Value: groups},
+		{Key: "flows", Value: &voltha.Flows{Items: flowSlice}},
+		{Key: "groups", Value: &voltha.FlowGroups{Items: groupSlice}},
 		{Key: "flow_metadata", Value: flowMetadata},
 	}
 	replyToTopic := ap.getCoreTopic()
@@ -296,7 +307,7 @@
 }
 
 // UpdateFlowsIncremental invokes update flows incremental rpc
-func (ap *AdapterProxy) UpdateFlowsIncremental(ctx context.Context, device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
+func (ap *AdapterProxy) UpdateFlowsIncremental(ctx context.Context, device *voltha.Device, flowChanges *ofp.FlowChanges, groupChanges *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
 	logger.Debugw(ctx, "UpdateFlowsIncremental",
 		log.Fields{
 			"device-id":             device.Id,
diff --git a/rw_core/core/device/remote/adapter_proxy_test.go b/rw_core/core/device/remote/adapter_proxy_test.go
index 46de80f..151483a 100755
--- a/rw_core/core/device/remote/adapter_proxy_test.go
+++ b/rw_core/core/device/remote/adapter_proxy_test.go
@@ -198,7 +198,7 @@
 func testFlowUpdates(t *testing.T) {
 	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
-	_, err := ap.UpdateFlowsBulk(context.Background(), d, &voltha.Flows{}, &voltha.FlowGroups{}, &voltha.FlowMetadata{})
+	_, err := ap.UpdateFlowsBulk(context.Background(), d, nil, nil, &voltha.FlowMetadata{})
 	assert.Nil(t, err)
 	flowChanges := &voltha.FlowChanges{ToAdd: &voltha.Flows{Items: nil}, ToRemove: &voltha.Flows{Items: nil}}
 	groupChanges := &voltha.FlowGroupChanges{ToAdd: &voltha.FlowGroups{Items: nil}, ToRemove: &voltha.FlowGroups{Items: nil}, ToUpdate: &voltha.FlowGroups{Items: nil}}