Packet in/out streaming from ofagent to core
Getting ready for packet streaming

Change-Id: I8d70d4d6ffbb23c0d7ab20582e9afac49f9f6461

Support flow_delete_strict

Change-Id: I5dab5f74a7daddcddfeb8691a3940347cb2fc11b

Packet out halfway plumbed

Change-Id: I799d3f59d42ac9de0563b5e6b9a0064fd895a6f6

refactored async_twisted

Change-Id: I68f8d12ce6fdbb70cee398f581669529b567d94d

Packet in pipeline and ofagent refactoring

Change-Id: I31ecbf7d52fdd18c3884b8d1870f673488f808df
diff --git a/experiments/streaming_client.py b/experiments/streaming_client.py
index 3d93092..0bf1136 100644
--- a/experiments/streaming_client.py
+++ b/experiments/streaming_client.py
@@ -1,16 +1,17 @@
 #!/usr/bin/env python
 
 import time
+from Queue import Queue
 
 import grpc
 from google.protobuf.empty_pb2 import Empty
 from twisted.internet import reactor
 from twisted.internet import threads
-from twisted.internet.defer import Deferred, inlineCallbacks, DeferredQueue
+from twisted.internet.defer import Deferred, inlineCallbacks, DeferredQueue, \
+    returnValue
 
 from common.utils.asleep import asleep
-from streaming_pb2 import ExperimentalServiceStub, Echo
-
+from streaming_pb2 import ExperimentalServiceStub, Echo, Packet
 
 t0 = time.time()
 
@@ -52,6 +53,36 @@
                 pr('event received: %s %s %s' % (
                    event.seq, event.type, event.details))
 
+    @inlineCallbacks
+    def send_packet_stream(self, stub, interval):
+        queue = Queue()
+
+        @inlineCallbacks
+        def get_next_from_queue():
+            packet = yield queue.get()
+            returnValue(packet)
+
+        def packet_generator():
+            while 1:
+                packet = queue.get(block=True)
+                yield packet
+
+        def stream(stub):
+            """This is executed on its own thread"""
+            generator = packet_generator()
+            result = stub.SendPackets(generator)
+            print 'Got this after sending packets:', result, type(result)
+            return result
+
+        reactor.callInThread(stream, stub)
+
+        while 1:
+            len = queue.qsize()
+            if len < 100:
+                packet = Packet(source=42, content='beefstew')
+                queue.put(packet)
+            yield asleep(interval)
+
 
 if __name__ == '__main__':
     client_services = ClientServices()
@@ -60,4 +91,5 @@
     reactor.callLater(0, client_services.echo_loop, stub, '', 0.2)
     reactor.callLater(0, client_services.echo_loop, stub, 40*' ', 2)
     reactor.callLater(0, client_services.receive_async_events, stub)
+    reactor.callLater(0, client_services.send_packet_stream, stub, 0.0000001)
     reactor.run()