blob: e7593f93b3190a4cc82a6dc982254d76908156b8 [file] [log] [blame]
Zsolt Harasztie39523b2016-10-16 19:30:34 -07001import time
2
3import grpc
4from google.protobuf.empty_pb2 import Empty
5from twisted.internet import reactor
6from twisted.internet import threads
7from twisted.internet.defer import Deferred, inlineCallbacks, DeferredQueue
8
9from streaming_pb2 import ExperimentalServiceStub, Echo
10
11
12def asleep(t):
13 d = Deferred()
14 reactor.callLater(t, d.callback, None)
15 return d
16
17t0 = time.time()
18
19
20def pr(s):
21 print '%lf %s' % (time.time() - t0, s)
22
23
24class ClientServices(object):
25
26 def async_receive_stream(self, func, *args, **kw):
27 queue = DeferredQueue()
28 def _execute():
29 for result in func(*args, **kw):
30 reactor.callFromThread(queue.put, result)
31 _ = threads.deferToThread(_execute)
32 while 1:
33 yield queue.get()
34
35 @inlineCallbacks
36 def echo_loop(self, stub, prefix='', interval=1.0):
37 """Send an echo message and print its return value"""
38 seq = 0
39 while 1:
40 msg = 'ECHO%05d' % seq
41 pr('{}sending echo {}'.format(prefix, msg))
42 request = Echo(msg=msg, delay=interval)
43 response = yield threads.deferToThread(stub.GetEcho, request)
44 pr('{} got echo {}'.format(prefix, response.msg))
45 seq += 1
46 yield asleep(interval)
47
48 @inlineCallbacks
49 def receive_async_events(self, stub):
50 e = Empty()
51 for next in self.async_receive_stream(stub.ReceiveStreamedEvents, e):
52 event = yield next
53 if event.seq % 100 == 0:
54 pr('event received: %s %s %s' % (
55 event.seq, event.type, event.details))
56
57
58if __name__ == '__main__':
59 client_services = ClientServices()
60 channel = grpc.insecure_channel('localhost:50050')
61 stub = ExperimentalServiceStub(channel)
62 reactor.callLater(0, client_services.echo_loop, stub, '', 0.2)
63 reactor.callLater(0, client_services.echo_loop, stub, 40*' ', 2)
64 reactor.callLater(0, client_services.receive_async_events, stub)
65 reactor.run()