Packet in/out streaming from ofagent to core
Getting ready for packet streaming
Change-Id: I8d70d4d6ffbb23c0d7ab20582e9afac49f9f6461
Support flow_delete_strict
Change-Id: I5dab5f74a7daddcddfeb8691a3940347cb2fc11b
Packet out halfway plumbed
Change-Id: I799d3f59d42ac9de0563b5e6b9a0064fd895a6f6
refactored async_twisted
Change-Id: I68f8d12ce6fdbb70cee398f581669529b567d94d
Packet in pipeline and ofagent refactoring
Change-Id: I31ecbf7d52fdd18c3884b8d1870f673488f808df
diff --git a/experiments/streaming_client.py b/experiments/streaming_client.py
index 3d93092..0bf1136 100644
--- a/experiments/streaming_client.py
+++ b/experiments/streaming_client.py
@@ -1,16 +1,17 @@
#!/usr/bin/env python
import time
+from Queue import Queue
import grpc
from google.protobuf.empty_pb2 import Empty
from twisted.internet import reactor
from twisted.internet import threads
-from twisted.internet.defer import Deferred, inlineCallbacks, DeferredQueue
+from twisted.internet.defer import Deferred, inlineCallbacks, DeferredQueue, \
+ returnValue
from common.utils.asleep import asleep
-from streaming_pb2 import ExperimentalServiceStub, Echo
-
+from streaming_pb2 import ExperimentalServiceStub, Echo, Packet
t0 = time.time()
@@ -52,6 +53,36 @@
pr('event received: %s %s %s' % (
event.seq, event.type, event.details))
+ @inlineCallbacks
+ def send_packet_stream(self, stub, interval):
+ queue = Queue()
+
+ @inlineCallbacks
+ def get_next_from_queue():
+ packet = yield queue.get()
+ returnValue(packet)
+
+ def packet_generator():
+ while 1:
+ packet = queue.get(block=True)
+ yield packet
+
+ def stream(stub):
+ """This is executed on its own thread"""
+ generator = packet_generator()
+ result = stub.SendPackets(generator)
+ print 'Got this after sending packets:', result, type(result)
+ return result
+
+ reactor.callInThread(stream, stub)
+
+ while 1:
+ len = queue.qsize()
+ if len < 100:
+ packet = Packet(source=42, content='beefstew')
+ queue.put(packet)
+ yield asleep(interval)
+
if __name__ == '__main__':
client_services = ClientServices()
@@ -60,4 +91,5 @@
reactor.callLater(0, client_services.echo_loop, stub, '', 0.2)
reactor.callLater(0, client_services.echo_loop, stub, 40*' ', 2)
reactor.callLater(0, client_services.receive_async_events, stub)
+ reactor.callLater(0, client_services.send_packet_stream, stub, 0.0000001)
reactor.run()