Zsolt Haraszti | e39523b | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 1 | import time |
| 2 | |
| 3 | import grpc |
| 4 | from google.protobuf.empty_pb2 import Empty |
| 5 | from twisted.internet import reactor |
| 6 | from twisted.internet import threads |
| 7 | from twisted.internet.defer import Deferred, inlineCallbacks, DeferredQueue |
| 8 | |
| 9 | from streaming_pb2 import ExperimentalServiceStub, Echo |
| 10 | |
| 11 | |
| 12 | def asleep(t): |
| 13 | d = Deferred() |
| 14 | reactor.callLater(t, d.callback, None) |
| 15 | return d |
| 16 | |
| 17 | t0 = time.time() |
| 18 | |
| 19 | |
| 20 | def pr(s): |
| 21 | print '%lf %s' % (time.time() - t0, s) |
| 22 | |
| 23 | |
| 24 | class ClientServices(object): |
| 25 | |
| 26 | def async_receive_stream(self, func, *args, **kw): |
| 27 | queue = DeferredQueue() |
| 28 | def _execute(): |
| 29 | for result in func(*args, **kw): |
| 30 | reactor.callFromThread(queue.put, result) |
| 31 | _ = threads.deferToThread(_execute) |
| 32 | while 1: |
| 33 | yield queue.get() |
| 34 | |
| 35 | @inlineCallbacks |
| 36 | def echo_loop(self, stub, prefix='', interval=1.0): |
| 37 | """Send an echo message and print its return value""" |
| 38 | seq = 0 |
| 39 | while 1: |
| 40 | msg = 'ECHO%05d' % seq |
| 41 | pr('{}sending echo {}'.format(prefix, msg)) |
| 42 | request = Echo(msg=msg, delay=interval) |
| 43 | response = yield threads.deferToThread(stub.GetEcho, request) |
| 44 | pr('{} got echo {}'.format(prefix, response.msg)) |
| 45 | seq += 1 |
| 46 | yield asleep(interval) |
| 47 | |
| 48 | @inlineCallbacks |
| 49 | def receive_async_events(self, stub): |
| 50 | e = Empty() |
| 51 | for next in self.async_receive_stream(stub.ReceiveStreamedEvents, e): |
| 52 | event = yield next |
| 53 | if event.seq % 100 == 0: |
| 54 | pr('event received: %s %s %s' % ( |
| 55 | event.seq, event.type, event.details)) |
| 56 | |
| 57 | |
| 58 | if __name__ == '__main__': |
| 59 | client_services = ClientServices() |
| 60 | channel = grpc.insecure_channel('localhost:50050') |
| 61 | stub = ExperimentalServiceStub(channel) |
| 62 | reactor.callLater(0, client_services.echo_loop, stub, '', 0.2) |
| 63 | reactor.callLater(0, client_services.echo_loop, stub, 40*' ', 2) |
| 64 | reactor.callLater(0, client_services.receive_async_events, stub) |
| 65 | reactor.run() |