blob: 0bf113685a1822606fd7bb0695ad5d2c0d26fb12 [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#!/usr/bin/env python
2
Zsolt Harasztie39523b2016-10-16 19:30:34 -07003import time
Zsolt Haraszticd22adc2016-10-25 00:13:06 -07004from Queue import Queue
Zsolt Harasztie39523b2016-10-16 19:30:34 -07005
6import grpc
7from google.protobuf.empty_pb2 import Empty
8from twisted.internet import reactor
9from twisted.internet import threads
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070010from twisted.internet.defer import Deferred, inlineCallbacks, DeferredQueue, \
11 returnValue
Zsolt Harasztie39523b2016-10-16 19:30:34 -070012
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070013from common.utils.asleep import asleep
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070014from streaming_pb2 import ExperimentalServiceStub, Echo, Packet
Zsolt Harasztie39523b2016-10-16 19:30:34 -070015
Zsolt Harasztie39523b2016-10-16 19:30:34 -070016t0 = time.time()
17
18
19def pr(s):
20 print '%lf %s' % (time.time() - t0, s)
21
22
23class ClientServices(object):
24
25 def async_receive_stream(self, func, *args, **kw):
26 queue = DeferredQueue()
27 def _execute():
28 for result in func(*args, **kw):
29 reactor.callFromThread(queue.put, result)
30 _ = threads.deferToThread(_execute)
31 while 1:
32 yield queue.get()
33
34 @inlineCallbacks
35 def echo_loop(self, stub, prefix='', interval=1.0):
36 """Send an echo message and print its return value"""
37 seq = 0
38 while 1:
39 msg = 'ECHO%05d' % seq
40 pr('{}sending echo {}'.format(prefix, msg))
41 request = Echo(msg=msg, delay=interval)
42 response = yield threads.deferToThread(stub.GetEcho, request)
43 pr('{} got echo {}'.format(prefix, response.msg))
44 seq += 1
45 yield asleep(interval)
46
47 @inlineCallbacks
48 def receive_async_events(self, stub):
49 e = Empty()
50 for next in self.async_receive_stream(stub.ReceiveStreamedEvents, e):
51 event = yield next
52 if event.seq % 100 == 0:
53 pr('event received: %s %s %s' % (
54 event.seq, event.type, event.details))
55
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070056 @inlineCallbacks
57 def send_packet_stream(self, stub, interval):
58 queue = Queue()
59
60 @inlineCallbacks
61 def get_next_from_queue():
62 packet = yield queue.get()
63 returnValue(packet)
64
65 def packet_generator():
66 while 1:
67 packet = queue.get(block=True)
68 yield packet
69
70 def stream(stub):
71 """This is executed on its own thread"""
72 generator = packet_generator()
73 result = stub.SendPackets(generator)
74 print 'Got this after sending packets:', result, type(result)
75 return result
76
77 reactor.callInThread(stream, stub)
78
79 while 1:
80 len = queue.qsize()
81 if len < 100:
82 packet = Packet(source=42, content='beefstew')
83 queue.put(packet)
84 yield asleep(interval)
85
Zsolt Harasztie39523b2016-10-16 19:30:34 -070086
87if __name__ == '__main__':
88 client_services = ClientServices()
89 channel = grpc.insecure_channel('localhost:50050')
90 stub = ExperimentalServiceStub(channel)
91 reactor.callLater(0, client_services.echo_loop, stub, '', 0.2)
92 reactor.callLater(0, client_services.echo_loop, stub, 40*' ', 2)
93 reactor.callLater(0, client_services.receive_async_events, stub)
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070094 reactor.callLater(0, client_services.send_packet_stream, stub, 0.0000001)
Zsolt Harasztie39523b2016-10-16 19:30:34 -070095 reactor.run()