Packet in/out streaming from ofagent to core
Getting ready for packet streaming

Change-Id: I8d70d4d6ffbb23c0d7ab20582e9afac49f9f6461

Support flow_delete_strict

Change-Id: I5dab5f74a7daddcddfeb8691a3940347cb2fc11b

Packet out halfway plumbed

Change-Id: I799d3f59d42ac9de0563b5e6b9a0064fd895a6f6

refactored async_twisted

Change-Id: I68f8d12ce6fdbb70cee398f581669529b567d94d

Packet in pipeline and ofagent refactoring

Change-Id: I31ecbf7d52fdd18c3884b8d1870f673488f808df
diff --git a/ofagent/ b/ofagent/
index 69bf940..efbc038 100644
--- a/ofagent/
+++ b/ofagent/
@@ -17,39 +17,86 @@
 The gRPC client layer for the OpenFlow agent
+from Queue import Queue
+from structlog import get_logger
+from twisted.internet import reactor
 from twisted.internet import threads
-from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
 from protos.voltha_pb2 import ID, VolthaLogicalLayerStub, FlowTableUpdate, \
-    GroupTableUpdate
+    GroupTableUpdate, NullMessage, PacketOut
+log = get_logger()
 class GrpcClient(object):
-    def __init__(self, channel, device_id_map):
+    def __init__(self, connection_manager, channel):
+        self.connection_manager = connection_manager = channel
-        self.device_id_map = device_id_map
         self.logical_stub = VolthaLogicalLayerStub(channel)
+        self.packet_out_queue = Queue()  # queue to send out PacketOut msgs
+        self.packet_in_queue = DeferredQueue()  # queue to receive PacketIn
+        self.start_packet_out_stream()
+        self.start_packet_in_stream()
+        reactor.callLater(0, self.packet_in_forwarder_loop)
+    def start_packet_out_stream(self):
+        def packet_generator():
+            while 1:
+                packet = self.packet_out_queue.get(block=True)
+                yield packet
+        def stream_packets_out():
+            generator = packet_generator()
+            self.logical_stub.StreamPacketsOut(generator)
+        reactor.callInThread(stream_packets_out)
+    def start_packet_in_stream(self):
+        def receive_packet_in_stream():
+            for packet_in in self.logical_stub.ReceivePacketsIn(NullMessage()):
+                reactor.callFromThread(self.packet_in_queue.put, packet_in)
+                log.debug('enqued-packet-in',
+                          packet_in=packet_in,
+                          queue_len=len(self.packet_in_queue.pending))
+        reactor.callInThread(receive_packet_in_stream)
-    def get_port_list(self, datapath_id):
-        device_id = self.device_id_map[datapath_id]
+    def packet_in_forwarder_loop(self):
+        while True:
+            packet_in = yield self.packet_in_queue.get()
+            device_id =
+            ofp_packet_in = packet_in.packet_in
+            self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
+    def send_packet_out(self, device_id, packet_out):
+        packet_out = PacketOut(id=device_id, packet_out=packet_out)
+        self.packet_out_queue.put(packet_out)
+    @inlineCallbacks
+    def get_port_list(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
             self.logical_stub.ListLogicalDevicePorts, req)
-    def get_device_info(self, datapath_id):
-        device_id = self.device_id_map[datapath_id]
+    def get_device_info(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
             self.logical_stub.GetLogicalDevice, req)
-    def update_flow_table(self, datapath_id, flow_mod):
-        device_id = self.device_id_map[datapath_id]
+    def update_flow_table(self, device_id, flow_mod):
         req = FlowTableUpdate(
@@ -59,8 +106,7 @@
-    def update_group_table(self, datapath_id, group_mod):
-        device_id = self.device_id_map[datapath_id]
+    def update_group_table(self, device_id, group_mod):
         req = GroupTableUpdate(
@@ -70,16 +116,14 @@
-    def list_flows(self, datapath_id):
-        device_id = self.device_id_map[datapath_id]
+    def list_flows(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
             self.logical_stub.ListDeviceFlows, req)
-    def list_groups(self, datapath_id):
-        device_id = self.device_id_map[datapath_id]
+    def list_groups(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
             self.logical_stub.ListDeviceFlowGroups, req)