Packet in/out streaming from ofagent to core
Getting ready for packet streaming

Change-Id: I8d70d4d6ffbb23c0d7ab20582e9afac49f9f6461

Support flow_delete_strict

Change-Id: I5dab5f74a7daddcddfeb8691a3940347cb2fc11b

Packet out halfway plumbed

Change-Id: I799d3f59d42ac9de0563b5e6b9a0064fd895a6f6

refactored async_twisted

Change-Id: I68f8d12ce6fdbb70cee398f581669529b567d94d

Packet in pipeline and ofagent refactoring

Change-Id: I31ecbf7d52fdd18c3884b8d1870f673488f808df
diff --git a/experiments/streaming_client.py b/experiments/streaming_client.py
index 3d93092..0bf1136 100644
--- a/experiments/streaming_client.py
+++ b/experiments/streaming_client.py
@@ -1,16 +1,17 @@
 #!/usr/bin/env python
 
 import time
+from Queue import Queue
 
 import grpc
 from google.protobuf.empty_pb2 import Empty
 from twisted.internet import reactor
 from twisted.internet import threads
-from twisted.internet.defer import Deferred, inlineCallbacks, DeferredQueue
+from twisted.internet.defer import Deferred, inlineCallbacks, DeferredQueue, \
+    returnValue
 
 from common.utils.asleep import asleep
-from streaming_pb2 import ExperimentalServiceStub, Echo
-
+from streaming_pb2 import ExperimentalServiceStub, Echo, Packet
 
 t0 = time.time()
 
@@ -52,6 +53,36 @@
                 pr('event received: %s %s %s' % (
                    event.seq, event.type, event.details))
 
+    @inlineCallbacks
+    def send_packet_stream(self, stub, interval):
+        queue = Queue()
+
+        @inlineCallbacks
+        def get_next_from_queue():
+            packet = yield queue.get()
+            returnValue(packet)
+
+        def packet_generator():
+            while 1:
+                packet = queue.get(block=True)
+                yield packet
+
+        def stream(stub):
+            """This is executed on its own thread"""
+            generator = packet_generator()
+            result = stub.SendPackets(generator)
+            print 'Got this after sending packets:', result, type(result)
+            return result
+
+        reactor.callInThread(stream, stub)
+
+        while 1:
+            len = queue.qsize()
+            if len < 100:
+                packet = Packet(source=42, content='beefstew')
+                queue.put(packet)
+            yield asleep(interval)
+
 
 if __name__ == '__main__':
     client_services = ClientServices()
@@ -60,4 +91,5 @@
     reactor.callLater(0, client_services.echo_loop, stub, '', 0.2)
     reactor.callLater(0, client_services.echo_loop, stub, 40*' ', 2)
     reactor.callLater(0, client_services.receive_async_events, stub)
+    reactor.callLater(0, client_services.send_packet_stream, stub, 0.0000001)
     reactor.run()
diff --git a/experiments/streaming_server.py b/experiments/streaming_server.py
index 588d4b3..c2ab286 100644
--- a/experiments/streaming_server.py
+++ b/experiments/streaming_server.py
@@ -7,7 +7,9 @@
 from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
 
 from common.utils.asleep import asleep
+from google.protobuf.empty_pb2 import Empty
 
+from common.utils.grpc_utils import twisted_async
 from streaming_pb2 import add_ExperimentalServiceServicer_to_server, \
     AsyncEvent, ExperimentalServiceServicer, Echo
 
@@ -19,57 +21,6 @@
 class ShuttingDown(Exception): pass
 
 
-def twisted_async(func):
-    """
-    This decorator can be used to implement a gRPC method on the twisted
-    thread, allowing asynchronous programming in Twisted while serving
-    a gRPC call.
-
-    gRPC methods normally are called on the futures.ThreadPool threads,
-    so these methods cannot directly use Twisted protocol constructs.
-    If the implementation of the methods needs to touch Twisted, it is
-    safer (or mandatory) to wrap the method with this decorator, which will
-    call the inner method from the external thread and ensure that the
-    result is passed back to the foreign thread.
-    """
-    def in_thread_wrapper(*args, **kw):
-
-        if ShutDown.stop:
-            raise ShuttingDown()
-        f = Future()
-
-        def twisted_wrapper():
-            try:
-                d = func(*args, **kw)
-                if isinstance(d, Deferred):
-
-                    def _done(result):
-                        f.set_result(result)
-                        f.done()
-
-                    def _error(e):
-                        f.set_exception(e)
-                        f.done()
-
-                    d.addCallback(_done)
-                    d.addErrback(_error)
-
-                else:
-                    f.set_result(d)
-                    f.done()
-
-            except Exception, e:
-                f.set_exception(e)
-                f.done()
-
-        reactor.callFromThread(twisted_wrapper)
-        result = f.result()
-
-        return result
-
-    return in_thread_wrapper
-
-
 class Service(ExperimentalServiceServicer):
 
     def __init__(self):
@@ -88,7 +39,7 @@
     @inlineCallbacks
     def get_next_event(self):
         """called on the twisted thread"""
-        yield asleep(0.0001)
+        yield asleep(0.000001)
         event = AsyncEvent(seq=self.event_seq, details='foo')
         self.event_seq += 1
         returnValue(event)
@@ -105,7 +56,12 @@
         pass
 
     def SendPackets(self, request, context):
-        pass
+        count = 0
+        for _ in request:
+            count += 1
+            if count % 1000 == 0:
+                print '%s got %d packets' % (20 * ' ', count)
+        return Empty()
 
 
 if __name__ == '__main__':