blob: c66f084cba5469d6b34ae72c56f1baad18479e77 [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#!/usr/bin/env python
Zack Williams41513bf2018-07-07 20:08:35 -07002# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070015
Zsolt Harasztie39523b2016-10-16 19:30:34 -070016import time
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070017from Queue import Queue
Zsolt Harasztie39523b2016-10-16 19:30:34 -070018
19import grpc
20from google.protobuf.empty_pb2 import Empty
21from twisted.internet import reactor
22from twisted.internet import threads
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070023from twisted.internet.defer import Deferred, inlineCallbacks, DeferredQueue, \
24 returnValue
Zsolt Harasztie39523b2016-10-16 19:30:34 -070025
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070026from common.utils.asleep import asleep
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070027from streaming_pb2 import ExperimentalServiceStub, Echo, Packet
Zsolt Harasztie39523b2016-10-16 19:30:34 -070028
Zsolt Harasztie39523b2016-10-16 19:30:34 -070029t0 = time.time()
30
31
32def pr(s):
33 print '%lf %s' % (time.time() - t0, s)
34
35
36class ClientServices(object):
37
38 def async_receive_stream(self, func, *args, **kw):
39 queue = DeferredQueue()
40 def _execute():
41 for result in func(*args, **kw):
42 reactor.callFromThread(queue.put, result)
43 _ = threads.deferToThread(_execute)
44 while 1:
45 yield queue.get()
46
47 @inlineCallbacks
48 def echo_loop(self, stub, prefix='', interval=1.0):
49 """Send an echo message and print its return value"""
50 seq = 0
51 while 1:
52 msg = 'ECHO%05d' % seq
53 pr('{}sending echo {}'.format(prefix, msg))
54 request = Echo(msg=msg, delay=interval)
55 response = yield threads.deferToThread(stub.GetEcho, request)
56 pr('{} got echo {}'.format(prefix, response.msg))
57 seq += 1
58 yield asleep(interval)
59
60 @inlineCallbacks
61 def receive_async_events(self, stub):
62 e = Empty()
63 for next in self.async_receive_stream(stub.ReceiveStreamedEvents, e):
64 event = yield next
65 if event.seq % 100 == 0:
66 pr('event received: %s %s %s' % (
67 event.seq, event.type, event.details))
68
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070069 @inlineCallbacks
70 def send_packet_stream(self, stub, interval):
71 queue = Queue()
72
73 @inlineCallbacks
74 def get_next_from_queue():
75 packet = yield queue.get()
76 returnValue(packet)
77
78 def packet_generator():
79 while 1:
80 packet = queue.get(block=True)
81 yield packet
82
83 def stream(stub):
84 """This is executed on its own thread"""
85 generator = packet_generator()
86 result = stub.SendPackets(generator)
87 print 'Got this after sending packets:', result, type(result)
88 return result
89
90 reactor.callInThread(stream, stub)
91
92 while 1:
93 len = queue.qsize()
94 if len < 100:
95 packet = Packet(source=42, content='beefstew')
96 queue.put(packet)
97 yield asleep(interval)
98
Zsolt Harasztie39523b2016-10-16 19:30:34 -070099
100if __name__ == '__main__':
101 client_services = ClientServices()
102 channel = grpc.insecure_channel('localhost:50050')
103 stub = ExperimentalServiceStub(channel)
104 reactor.callLater(0, client_services.echo_loop, stub, '', 0.2)
105 reactor.callLater(0, client_services.echo_loop, stub, 40*' ', 2)
106 reactor.callLater(0, client_services.receive_async_events, stub)
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700107 reactor.callLater(0, client_services.send_packet_stream, stub, 0.0000001)
Zsolt Harasztie39523b2016-10-16 19:30:34 -0700108 reactor.run()