VOL-2867 - Simplified the logic for devices' flow & group updates.

Change-Id: I12b9916212dc66ecdf6d975de499dd767fac248f
diff --git a/rw_core/core/device/remote/adapter_proxy.go b/rw_core/core/device/remote/adapter_proxy.go
index f6b0e9e..6654faf 100755
--- a/rw_core/core/device/remote/adapter_proxy.go
+++ b/rw_core/core/device/remote/adapter_proxy.go
@@ -22,7 +22,7 @@
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
-	"github.com/opencord/voltha-protos/v3/go/openflow_13"
+	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 )
 
@@ -261,7 +261,7 @@
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-func (ap *AdapterProxy) PacketOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *openflow_13.OfpPacketOut) (chan *kafka.RpcResponse, error) {
+func (ap *AdapterProxy) PacketOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *ofp.OfpPacketOut) (chan *kafka.RpcResponse, error) {
 	logger.Debugw(ctx, "PacketOut", log.Fields{"device-id": deviceID, "device-type": deviceType, "out-port": outPort})
 	toTopic, err := ap.getAdapterTopic(ctx, deviceID, deviceType)
 	if err != nil {
@@ -278,17 +278,28 @@
 }
 
 // UpdateFlowsBulk invokes update flows bulk rpc
-func (ap *AdapterProxy) UpdateFlowsBulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
-	logger.Debugw(ctx, "UpdateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows.Items), "group-count": len(groups.Items), "flow-metadata": flowMetadata})
+func (ap *AdapterProxy) UpdateFlowsBulk(ctx context.Context, device *voltha.Device, flows map[uint64]*ofp.OfpFlowStats, groups map[uint32]*voltha.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
+	logger.Debugw(ctx, "UpdateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows), "group-count": len(groups), "flow-metadata": flowMetadata})
 	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
 	rpc := "update_flows_bulk"
+
+	ctr, flowSlice := 0, make([]*ofp.OfpFlowStats, len(flows))
+	for _, flow := range flows {
+		flowSlice[ctr] = flow
+		ctr++
+	}
+	ctr, groupSlice := 0, make([]*ofp.OfpGroupEntry, len(groups))
+	for _, group := range groups {
+		groupSlice[ctr] = group
+		ctr++
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
-		{Key: "flows", Value: flows},
-		{Key: "groups", Value: groups},
+		{Key: "flows", Value: &voltha.Flows{Items: flowSlice}},
+		{Key: "groups", Value: &voltha.FlowGroups{Items: groupSlice}},
 		{Key: "flow_metadata", Value: flowMetadata},
 	}
 	replyToTopic := ap.getCoreTopic()
@@ -296,7 +307,7 @@
 }
 
 // UpdateFlowsIncremental invokes update flows incremental rpc
-func (ap *AdapterProxy) UpdateFlowsIncremental(ctx context.Context, device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
+func (ap *AdapterProxy) UpdateFlowsIncremental(ctx context.Context, device *voltha.Device, flowChanges *ofp.FlowChanges, groupChanges *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
 	logger.Debugw(ctx, "UpdateFlowsIncremental",
 		log.Fields{
 			"device-id":             device.Id,