Packet in/out streaming from ofagent to core
Getting ready for packet streaming
Change-Id: I8d70d4d6ffbb23c0d7ab20582e9afac49f9f6461
Support flow_delete_strict
Change-Id: I5dab5f74a7daddcddfeb8691a3940347cb2fc11b
Packet out halfway plumbed
Change-Id: I799d3f59d42ac9de0563b5e6b9a0064fd895a6f6
refactored async_twisted
Change-Id: I68f8d12ce6fdbb70cee398f581669529b567d94d
Packet in pipeline and ofagent refactoring
Change-Id: I31ecbf7d52fdd18c3884b8d1870f673488f808df
diff --git a/experiments/streaming_client.py b/experiments/streaming_client.py
index 3d93092..0bf1136 100644
--- a/experiments/streaming_client.py
+++ b/experiments/streaming_client.py
@@ -1,16 +1,17 @@
#!/usr/bin/env python
import time
+from Queue import Queue
import grpc
from google.protobuf.empty_pb2 import Empty
from twisted.internet import reactor
from twisted.internet import threads
-from twisted.internet.defer import Deferred, inlineCallbacks, DeferredQueue
+from twisted.internet.defer import Deferred, inlineCallbacks, DeferredQueue, \
+ returnValue
from common.utils.asleep import asleep
-from streaming_pb2 import ExperimentalServiceStub, Echo
-
+from streaming_pb2 import ExperimentalServiceStub, Echo, Packet
t0 = time.time()
@@ -52,6 +53,36 @@
pr('event received: %s %s %s' % (
event.seq, event.type, event.details))
+ @inlineCallbacks
+ def send_packet_stream(self, stub, interval):
+ queue = Queue()
+
+ @inlineCallbacks
+ def get_next_from_queue():
+ packet = yield queue.get()
+ returnValue(packet)
+
+ def packet_generator():
+ while 1:
+ packet = queue.get(block=True)
+ yield packet
+
+ def stream(stub):
+ """This is executed on its own thread"""
+ generator = packet_generator()
+ result = stub.SendPackets(generator)
+ print 'Got this after sending packets:', result, type(result)
+ return result
+
+ reactor.callInThread(stream, stub)
+
+ while 1:
+ len = queue.qsize()
+ if len < 100:
+ packet = Packet(source=42, content='beefstew')
+ queue.put(packet)
+ yield asleep(interval)
+
if __name__ == '__main__':
client_services = ClientServices()
@@ -60,4 +91,5 @@
reactor.callLater(0, client_services.echo_loop, stub, '', 0.2)
reactor.callLater(0, client_services.echo_loop, stub, 40*' ', 2)
reactor.callLater(0, client_services.receive_async_events, stub)
+ reactor.callLater(0, client_services.send_packet_stream, stub, 0.0000001)
reactor.run()
diff --git a/experiments/streaming_server.py b/experiments/streaming_server.py
index 588d4b3..c2ab286 100644
--- a/experiments/streaming_server.py
+++ b/experiments/streaming_server.py
@@ -7,7 +7,9 @@
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
from common.utils.asleep import asleep
+from google.protobuf.empty_pb2 import Empty
+from common.utils.grpc_utils import twisted_async
from streaming_pb2 import add_ExperimentalServiceServicer_to_server, \
AsyncEvent, ExperimentalServiceServicer, Echo
@@ -19,57 +21,6 @@
class ShuttingDown(Exception): pass
-def twisted_async(func):
- """
- This decorator can be used to implement a gRPC method on the twisted
- thread, allowing asynchronous programming in Twisted while serving
- a gRPC call.
-
- gRPC methods normally are called on the futures.ThreadPool threads,
- so these methods cannot directly use Twisted protocol constructs.
- If the implementation of the methods needs to touch Twisted, it is
- safer (or mandatory) to wrap the method with this decorator, which will
- call the inner method from the external thread and ensure that the
- result is passed back to the foreign thread.
- """
- def in_thread_wrapper(*args, **kw):
-
- if ShutDown.stop:
- raise ShuttingDown()
- f = Future()
-
- def twisted_wrapper():
- try:
- d = func(*args, **kw)
- if isinstance(d, Deferred):
-
- def _done(result):
- f.set_result(result)
- f.done()
-
- def _error(e):
- f.set_exception(e)
- f.done()
-
- d.addCallback(_done)
- d.addErrback(_error)
-
- else:
- f.set_result(d)
- f.done()
-
- except Exception, e:
- f.set_exception(e)
- f.done()
-
- reactor.callFromThread(twisted_wrapper)
- result = f.result()
-
- return result
-
- return in_thread_wrapper
-
-
class Service(ExperimentalServiceServicer):
def __init__(self):
@@ -88,7 +39,7 @@
@inlineCallbacks
def get_next_event(self):
"""called on the twisted thread"""
- yield asleep(0.0001)
+ yield asleep(0.000001)
event = AsyncEvent(seq=self.event_seq, details='foo')
self.event_seq += 1
returnValue(event)
@@ -105,7 +56,12 @@
pass
def SendPackets(self, request, context):
- pass
+ count = 0
+ for _ in request:
+ count += 1
+ if count % 1000 == 0:
+ print '%s got %d packets' % (20 * ' ', count)
+ return Empty()
if __name__ == '__main__':