Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 1 | #!/usr/bin/env python |
Zack Williams | 41513bf | 2018-07-07 20:08:35 -0700 | [diff] [blame] | 2 | # Copyright 2017-present Open Networking Foundation |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 15 | |
Zsolt Haraszti | e39523b | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 16 | import time |
Zsolt Haraszti | cd22adc | 2016-10-25 00:13:06 -0700 | [diff] [blame] | 17 | from Queue import Queue |
Zsolt Haraszti | e39523b | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 18 | |
| 19 | import grpc |
| 20 | from google.protobuf.empty_pb2 import Empty |
| 21 | from twisted.internet import reactor |
| 22 | from twisted.internet import threads |
Zsolt Haraszti | cd22adc | 2016-10-25 00:13:06 -0700 | [diff] [blame] | 23 | from twisted.internet.defer import Deferred, inlineCallbacks, DeferredQueue, \ |
| 24 | returnValue |
Zsolt Haraszti | e39523b | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 25 | |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 26 | from common.utils.asleep import asleep |
Zsolt Haraszti | cd22adc | 2016-10-25 00:13:06 -0700 | [diff] [blame] | 27 | from streaming_pb2 import ExperimentalServiceStub, Echo, Packet |
Zsolt Haraszti | e39523b | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 28 | |
Zsolt Haraszti | e39523b | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 29 | t0 = time.time() |
| 30 | |
| 31 | |
| 32 | def pr(s): |
| 33 | print '%lf %s' % (time.time() - t0, s) |
| 34 | |
| 35 | |
| 36 | class ClientServices(object): |
| 37 | |
| 38 | def async_receive_stream(self, func, *args, **kw): |
| 39 | queue = DeferredQueue() |
| 40 | def _execute(): |
| 41 | for result in func(*args, **kw): |
| 42 | reactor.callFromThread(queue.put, result) |
| 43 | _ = threads.deferToThread(_execute) |
| 44 | while 1: |
| 45 | yield queue.get() |
| 46 | |
| 47 | @inlineCallbacks |
| 48 | def echo_loop(self, stub, prefix='', interval=1.0): |
| 49 | """Send an echo message and print its return value""" |
| 50 | seq = 0 |
| 51 | while 1: |
| 52 | msg = 'ECHO%05d' % seq |
| 53 | pr('{}sending echo {}'.format(prefix, msg)) |
| 54 | request = Echo(msg=msg, delay=interval) |
| 55 | response = yield threads.deferToThread(stub.GetEcho, request) |
| 56 | pr('{} got echo {}'.format(prefix, response.msg)) |
| 57 | seq += 1 |
| 58 | yield asleep(interval) |
| 59 | |
| 60 | @inlineCallbacks |
| 61 | def receive_async_events(self, stub): |
| 62 | e = Empty() |
| 63 | for next in self.async_receive_stream(stub.ReceiveStreamedEvents, e): |
| 64 | event = yield next |
| 65 | if event.seq % 100 == 0: |
| 66 | pr('event received: %s %s %s' % ( |
| 67 | event.seq, event.type, event.details)) |
| 68 | |
Zsolt Haraszti | cd22adc | 2016-10-25 00:13:06 -0700 | [diff] [blame] | 69 | @inlineCallbacks |
| 70 | def send_packet_stream(self, stub, interval): |
| 71 | queue = Queue() |
| 72 | |
| 73 | @inlineCallbacks |
| 74 | def get_next_from_queue(): |
| 75 | packet = yield queue.get() |
| 76 | returnValue(packet) |
| 77 | |
| 78 | def packet_generator(): |
| 79 | while 1: |
| 80 | packet = queue.get(block=True) |
| 81 | yield packet |
| 82 | |
| 83 | def stream(stub): |
| 84 | """This is executed on its own thread""" |
| 85 | generator = packet_generator() |
| 86 | result = stub.SendPackets(generator) |
| 87 | print 'Got this after sending packets:', result, type(result) |
| 88 | return result |
| 89 | |
| 90 | reactor.callInThread(stream, stub) |
| 91 | |
| 92 | while 1: |
| 93 | len = queue.qsize() |
| 94 | if len < 100: |
| 95 | packet = Packet(source=42, content='beefstew') |
| 96 | queue.put(packet) |
| 97 | yield asleep(interval) |
| 98 | |
Zsolt Haraszti | e39523b | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 99 | |
| 100 | if __name__ == '__main__': |
| 101 | client_services = ClientServices() |
| 102 | channel = grpc.insecure_channel('localhost:50050') |
| 103 | stub = ExperimentalServiceStub(channel) |
| 104 | reactor.callLater(0, client_services.echo_loop, stub, '', 0.2) |
| 105 | reactor.callLater(0, client_services.echo_loop, stub, 40*' ', 2) |
| 106 | reactor.callLater(0, client_services.receive_async_events, stub) |
Zsolt Haraszti | cd22adc | 2016-10-25 00:13:06 -0700 | [diff] [blame] | 107 | reactor.callLater(0, client_services.send_packet_stream, stub, 0.0000001) |
Zsolt Haraszti | e39523b | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 108 | reactor.run() |