blob: 588d4b33e444fc7be902c3e3558424dd5a450068 [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#!/usr/bin/env python
2
Zsolt Harasztie39523b2016-10-16 19:30:34 -07003import grpc
4from concurrent import futures
5from concurrent.futures import Future
6from twisted.internet import reactor
7from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
8
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07009from common.utils.asleep import asleep
10
Zsolt Harasztie39523b2016-10-16 19:30:34 -070011from streaming_pb2 import add_ExperimentalServiceServicer_to_server, \
12 AsyncEvent, ExperimentalServiceServicer, Echo
13
14
15class ShutDown(object):
16 stop = False # semaphore for all loops to stop when this flag is set
17
18
Zsolt Harasztie39523b2016-10-16 19:30:34 -070019class ShuttingDown(Exception): pass
20
21
22def twisted_async(func):
23 """
24 This decorator can be used to implement a gRPC method on the twisted
25 thread, allowing asynchronous programming in Twisted while serving
26 a gRPC call.
27
28 gRPC methods normally are called on the futures.ThreadPool threads,
29 so these methods cannot directly use Twisted protocol constructs.
30 If the implementation of the methods needs to touch Twisted, it is
31 safer (or mandatory) to wrap the method with this decorator, which will
32 call the inner method from the external thread and ensure that the
33 result is passed back to the foreign thread.
34 """
35 def in_thread_wrapper(*args, **kw):
36
37 if ShutDown.stop:
38 raise ShuttingDown()
39 f = Future()
40
41 def twisted_wrapper():
42 try:
43 d = func(*args, **kw)
44 if isinstance(d, Deferred):
45
46 def _done(result):
47 f.set_result(result)
48 f.done()
49
50 def _error(e):
51 f.set_exception(e)
52 f.done()
53
54 d.addCallback(_done)
55 d.addErrback(_error)
56
57 else:
58 f.set_result(d)
59 f.done()
60
61 except Exception, e:
62 f.set_exception(e)
63 f.done()
64
65 reactor.callFromThread(twisted_wrapper)
66 result = f.result()
67
68 return result
69
70 return in_thread_wrapper
71
72
73class Service(ExperimentalServiceServicer):
74
75 def __init__(self):
76 self.event_seq = 0
77
78 @twisted_async
79 @inlineCallbacks
80 def GetEcho(self, request, context):
81 print 'got Echo({}) request'.format(request.msg)
82 yield asleep(request.delay)
83 msg = request.msg + ' <<'
84 print ' Echo({}) reply'.format(msg)
85 returnValue(Echo(msg=msg))
86
87 @twisted_async
88 @inlineCallbacks
89 def get_next_event(self):
90 """called on the twisted thread"""
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070091 yield asleep(0.0001)
Zsolt Harasztie39523b2016-10-16 19:30:34 -070092 event = AsyncEvent(seq=self.event_seq, details='foo')
93 self.event_seq += 1
94 returnValue(event)
95
96 def ReceiveStreamedEvents(self, request, context):
97 """called on a thread-pool thread"""
98 print 'got ReceiveStreamedEvents request'
99 while 1:
100 if ShutDown.stop:
101 break
102 yield self.get_next_event()
103
104 def ReceivePackets(self, request, context):
105 pass
106
107 def SendPackets(self, request, context):
108 pass
109
110
Zsolt Harasztie39523b2016-10-16 19:30:34 -0700111if __name__ == '__main__':
112 thread_pool = futures.ThreadPoolExecutor(max_workers=10)
113 server = grpc.server(thread_pool)
114 add_ExperimentalServiceServicer_to_server(Service(), server)
Zsolt Harasztie39523b2016-10-16 19:30:34 -0700115 server.add_insecure_port('[::]:50050')
116 server.start()
117 def shutdown():
118 ShutDown.stop = True
119 thread_pool.shutdown(wait=True)
120 server.stop(0)
121 reactor.addSystemEventTrigger('before', 'shutdown', shutdown)
122 reactor.run()