blob: 3d930926690dda649974047d3cbf4584be31b98e [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#!/usr/bin/env python
2
Zsolt Harasztie39523b2016-10-16 19:30:34 -07003import time
4
5import grpc
6from google.protobuf.empty_pb2 import Empty
7from twisted.internet import reactor
8from twisted.internet import threads
9from twisted.internet.defer import Deferred, inlineCallbacks, DeferredQueue
10
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070011from common.utils.asleep import asleep
Zsolt Harasztie39523b2016-10-16 19:30:34 -070012from streaming_pb2 import ExperimentalServiceStub, Echo
13
14
Zsolt Harasztie39523b2016-10-16 19:30:34 -070015t0 = time.time()
16
17
18def pr(s):
19 print '%lf %s' % (time.time() - t0, s)
20
21
22class ClientServices(object):
23
24 def async_receive_stream(self, func, *args, **kw):
25 queue = DeferredQueue()
26 def _execute():
27 for result in func(*args, **kw):
28 reactor.callFromThread(queue.put, result)
29 _ = threads.deferToThread(_execute)
30 while 1:
31 yield queue.get()
32
33 @inlineCallbacks
34 def echo_loop(self, stub, prefix='', interval=1.0):
35 """Send an echo message and print its return value"""
36 seq = 0
37 while 1:
38 msg = 'ECHO%05d' % seq
39 pr('{}sending echo {}'.format(prefix, msg))
40 request = Echo(msg=msg, delay=interval)
41 response = yield threads.deferToThread(stub.GetEcho, request)
42 pr('{} got echo {}'.format(prefix, response.msg))
43 seq += 1
44 yield asleep(interval)
45
46 @inlineCallbacks
47 def receive_async_events(self, stub):
48 e = Empty()
49 for next in self.async_receive_stream(stub.ReceiveStreamedEvents, e):
50 event = yield next
51 if event.seq % 100 == 0:
52 pr('event received: %s %s %s' % (
53 event.seq, event.type, event.details))
54
55
56if __name__ == '__main__':
57 client_services = ClientServices()
58 channel = grpc.insecure_channel('localhost:50050')
59 stub = ExperimentalServiceStub(channel)
60 reactor.callLater(0, client_services.echo_loop, stub, '', 0.2)
61 reactor.callLater(0, client_services.echo_loop, stub, 40*' ', 2)
62 reactor.callLater(0, client_services.receive_async_events, stub)
63 reactor.run()