blob: c2ab286a7ae81ef954393b45975a3368fe021498 [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#!/usr/bin/env python
2
Zsolt Harasztie39523b2016-10-16 19:30:34 -07003import grpc
4from concurrent import futures
5from concurrent.futures import Future
6from twisted.internet import reactor
7from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
8
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07009from common.utils.asleep import asleep
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070010from google.protobuf.empty_pb2 import Empty
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070011
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070012from common.utils.grpc_utils import twisted_async
Zsolt Harasztie39523b2016-10-16 19:30:34 -070013from streaming_pb2 import add_ExperimentalServiceServicer_to_server, \
14 AsyncEvent, ExperimentalServiceServicer, Echo
15
16
17class ShutDown(object):
18 stop = False # semaphore for all loops to stop when this flag is set
19
20
Zsolt Harasztie39523b2016-10-16 19:30:34 -070021class ShuttingDown(Exception): pass
22
23
Zsolt Harasztie39523b2016-10-16 19:30:34 -070024class Service(ExperimentalServiceServicer):
25
26 def __init__(self):
27 self.event_seq = 0
28
29 @twisted_async
30 @inlineCallbacks
31 def GetEcho(self, request, context):
32 print 'got Echo({}) request'.format(request.msg)
33 yield asleep(request.delay)
34 msg = request.msg + ' <<'
35 print ' Echo({}) reply'.format(msg)
36 returnValue(Echo(msg=msg))
37
38 @twisted_async
39 @inlineCallbacks
40 def get_next_event(self):
41 """called on the twisted thread"""
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070042 yield asleep(0.000001)
Zsolt Harasztie39523b2016-10-16 19:30:34 -070043 event = AsyncEvent(seq=self.event_seq, details='foo')
44 self.event_seq += 1
45 returnValue(event)
46
47 def ReceiveStreamedEvents(self, request, context):
48 """called on a thread-pool thread"""
49 print 'got ReceiveStreamedEvents request'
50 while 1:
51 if ShutDown.stop:
52 break
53 yield self.get_next_event()
54
55 def ReceivePackets(self, request, context):
56 pass
57
58 def SendPackets(self, request, context):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070059 count = 0
60 for _ in request:
61 count += 1
62 if count % 1000 == 0:
63 print '%s got %d packets' % (20 * ' ', count)
64 return Empty()
Zsolt Harasztie39523b2016-10-16 19:30:34 -070065
66
Zsolt Harasztie39523b2016-10-16 19:30:34 -070067if __name__ == '__main__':
68 thread_pool = futures.ThreadPoolExecutor(max_workers=10)
69 server = grpc.server(thread_pool)
70 add_ExperimentalServiceServicer_to_server(Service(), server)
Zsolt Harasztie39523b2016-10-16 19:30:34 -070071 server.add_insecure_port('[::]:50050')
72 server.start()
73 def shutdown():
74 ShutDown.stop = True
75 thread_pool.shutdown(wait=True)
76 server.stop(0)
77 reactor.addSystemEventTrigger('before', 'shutdown', shutdown)
78 reactor.run()