[VOL-1037, VOL-1035] This commit consists of flow and groups
handling (from NBI to Adapters, including decomposition),

Change-Id: I4f6d9ecd3dee8a9b161708b20b0a68d030c0cb23
diff --git a/adapters/kafka/adapter_request_facade.py b/adapters/kafka/adapter_request_facade.py
index f4898a3..3009206 100644
--- a/adapters/kafka/adapter_request_facade.py
+++ b/adapters/kafka/adapter_request_facade.py
@@ -42,6 +42,7 @@
 from adapters.common.utils.registry import registry
 from adapters.common.utils.id_generation import create_cluster_device_id
 from adapters.protos.core_adapter_pb2 import IntType
+from adapters.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, FlowGroupChanges
 import re
 
 
@@ -167,10 +168,38 @@
         return self.adapter.get_device_details(device)
 
     def update_flows_bulk(self, device, flows, groups):
-        return self.adapter.update_flows_bulk(device, flows, groups)
+        d = Device()
+        if device:
+            device.Unpack(d)
+        else:
+            return (False, d)
+
+        f = Flows()
+        if flows:
+            flows.Unpack(f)
+
+        g = FlowGroups()
+        if groups:
+            groups.Unpack(g)
+
+        return (True, self.adapter.update_flows_bulk(d, f, g))
 
     def update_flows_incrementally(self, device, flow_changes, group_changes):
-        return self.adapter.update_flows_incrementally(device, flow_changes, group_changes)
+        d = Device()
+        if device:
+            device.Unpack(d)
+        else:
+            return (False, d)
+
+        f = FlowChanges()
+        if flow_changes:
+            flow_changes.Unpack(f)
+
+        g = FlowGroupChanges()
+        if group_changes:
+            group_changes.Unpack(g)
+
+        return (True, self.adapter.update_flows_incrementally(d, f, g))
 
     def suppress_alarm(self, filter):
         return self.adapter.suppress_alarm(filter)