[VOL-1037, VOL-1035] This commit consists of flow and groups
handling (from NBI to Adapters, including decomposition),

Change-Id: I4f6d9ecd3dee8a9b161708b20b0a68d030c0cb23
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 82c6dec..643c9de 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -22,6 +22,7 @@
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/kafka"
 	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	"github.com/opencord/voltha-go/protos/openflow_13"
 	"github.com/opencord/voltha-go/protos/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -249,14 +250,50 @@
 	return nil, nil
 }
 
-func (ap *AdapterProxy) UpdateFlowsBulk(device voltha.Device, flows voltha.Flows, groups voltha.FlowGroups) error {
-	log.Debug("UpdateFlowsBulk")
-	return nil
+func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
+	log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id})
+	topic := kafka.Topic{Name: device.Type}
+	rpc := "update_flows_bulk"
+	args := make([]*kafka.KVArg, 3)
+	args[0] = &kafka.KVArg{
+		Key:   "device",
+		Value: device,
+	}
+	args[1] = &kafka.KVArg{
+		Key:   "flows",
+		Value: flows,
+	}
+	args[2] = &kafka.KVArg{
+		Key:   "groups",
+		Value: groups,
+	}
+
+	success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
+	return unPackResponse(rpc, device.Id, success, result)
 }
 
-func (ap *AdapterProxy) UpdateFlowsIncremental(device voltha.Device, flowChanges voltha.Flows, groupChanges voltha.FlowGroups) error {
-	log.Debug("UpdateFlowsIncremental")
-	return nil
+func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
+	log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id})
+	topic := kafka.Topic{Name: device.Type}
+	rpc := "update_flows_bulk"
+	args := make([]*kafka.KVArg, 3)
+	args[0] = &kafka.KVArg{
+		Key:   "device",
+		Value: device,
+	}
+	args[1] = &kafka.KVArg{
+		Key:   "flow_changes",
+		Value: flowChanges,
+	}
+	args[2] = &kafka.KVArg{
+		Key:   "group_changes",
+		Value: groupChanges,
+	}
+
+	success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
+	return unPackResponse(rpc, device.Id, success, result)
 }
 
 func (ap *AdapterProxy) UpdatePmConfig(device voltha.Device, pmConfigs voltha.PmConfigs) error {