Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 1 | #!/usr/bin/env python |
| 2 | |
Zsolt Haraszti | e39523b | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 3 | import time |
| 4 | |
| 5 | import grpc |
| 6 | from google.protobuf.empty_pb2 import Empty |
| 7 | from twisted.internet import reactor |
| 8 | from twisted.internet import threads |
| 9 | from twisted.internet.defer import Deferred, inlineCallbacks, DeferredQueue |
| 10 | |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 11 | from common.utils.asleep import asleep |
Zsolt Haraszti | e39523b | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 12 | from streaming_pb2 import ExperimentalServiceStub, Echo |
| 13 | |
| 14 | |
Zsolt Haraszti | e39523b | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 15 | t0 = time.time() |
| 16 | |
| 17 | |
| 18 | def pr(s): |
| 19 | print '%lf %s' % (time.time() - t0, s) |
| 20 | |
| 21 | |
| 22 | class ClientServices(object): |
| 23 | |
| 24 | def async_receive_stream(self, func, *args, **kw): |
| 25 | queue = DeferredQueue() |
| 26 | def _execute(): |
| 27 | for result in func(*args, **kw): |
| 28 | reactor.callFromThread(queue.put, result) |
| 29 | _ = threads.deferToThread(_execute) |
| 30 | while 1: |
| 31 | yield queue.get() |
| 32 | |
| 33 | @inlineCallbacks |
| 34 | def echo_loop(self, stub, prefix='', interval=1.0): |
| 35 | """Send an echo message and print its return value""" |
| 36 | seq = 0 |
| 37 | while 1: |
| 38 | msg = 'ECHO%05d' % seq |
| 39 | pr('{}sending echo {}'.format(prefix, msg)) |
| 40 | request = Echo(msg=msg, delay=interval) |
| 41 | response = yield threads.deferToThread(stub.GetEcho, request) |
| 42 | pr('{} got echo {}'.format(prefix, response.msg)) |
| 43 | seq += 1 |
| 44 | yield asleep(interval) |
| 45 | |
| 46 | @inlineCallbacks |
| 47 | def receive_async_events(self, stub): |
| 48 | e = Empty() |
| 49 | for next in self.async_receive_stream(stub.ReceiveStreamedEvents, e): |
| 50 | event = yield next |
| 51 | if event.seq % 100 == 0: |
| 52 | pr('event received: %s %s %s' % ( |
| 53 | event.seq, event.type, event.details)) |
| 54 | |
| 55 | |
| 56 | if __name__ == '__main__': |
| 57 | client_services = ClientServices() |
| 58 | channel = grpc.insecure_channel('localhost:50050') |
| 59 | stub = ExperimentalServiceStub(channel) |
| 60 | reactor.callLater(0, client_services.echo_loop, stub, '', 0.2) |
| 61 | reactor.callLater(0, client_services.echo_loop, stub, 40*' ', 2) |
| 62 | reactor.callLater(0, client_services.receive_async_events, stub) |
| 63 | reactor.run() |