blob: a2074771d8b61d183f66d651afb3c8a3b9341074 [file] [log] [blame]
Zsolt Harasztie39523b2016-10-16 19:30:34 -07001import grpc
2from concurrent import futures
3from concurrent.futures import Future
4from twisted.internet import reactor
5from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
6
7from openflow_13_pb2 import add_OpenFlowServicer_to_server, \
8 OpenFlowServicer
9from streaming_pb2 import add_ExperimentalServiceServicer_to_server, \
10 AsyncEvent, ExperimentalServiceServicer, Echo
11
12
13class ShutDown(object):
14 stop = False # semaphore for all loops to stop when this flag is set
15
16
17def asleep(t):
18 d = Deferred()
19 reactor.callLater(t, d.callback, None)
20 return d
21
22
23class ShuttingDown(Exception): pass
24
25
26def twisted_async(func):
27 """
28 This decorator can be used to implement a gRPC method on the twisted
29 thread, allowing asynchronous programming in Twisted while serving
30 a gRPC call.
31
32 gRPC methods normally are called on the futures.ThreadPool threads,
33 so these methods cannot directly use Twisted protocol constructs.
34 If the implementation of the methods needs to touch Twisted, it is
35 safer (or mandatory) to wrap the method with this decorator, which will
36 call the inner method from the external thread and ensure that the
37 result is passed back to the foreign thread.
38 """
39 def in_thread_wrapper(*args, **kw):
40
41 if ShutDown.stop:
42 raise ShuttingDown()
43 f = Future()
44
45 def twisted_wrapper():
46 try:
47 d = func(*args, **kw)
48 if isinstance(d, Deferred):
49
50 def _done(result):
51 f.set_result(result)
52 f.done()
53
54 def _error(e):
55 f.set_exception(e)
56 f.done()
57
58 d.addCallback(_done)
59 d.addErrback(_error)
60
61 else:
62 f.set_result(d)
63 f.done()
64
65 except Exception, e:
66 f.set_exception(e)
67 f.done()
68
69 reactor.callFromThread(twisted_wrapper)
70 result = f.result()
71
72 return result
73
74 return in_thread_wrapper
75
76
77class Service(ExperimentalServiceServicer):
78
79 def __init__(self):
80 self.event_seq = 0
81
82 @twisted_async
83 @inlineCallbacks
84 def GetEcho(self, request, context):
85 print 'got Echo({}) request'.format(request.msg)
86 yield asleep(request.delay)
87 msg = request.msg + ' <<'
88 print ' Echo({}) reply'.format(msg)
89 returnValue(Echo(msg=msg))
90
91 @twisted_async
92 @inlineCallbacks
93 def get_next_event(self):
94 """called on the twisted thread"""
95 yield asleep(0.000001)
96 event = AsyncEvent(seq=self.event_seq, details='foo')
97 self.event_seq += 1
98 returnValue(event)
99
100 def ReceiveStreamedEvents(self, request, context):
101 """called on a thread-pool thread"""
102 print 'got ReceiveStreamedEvents request'
103 while 1:
104 if ShutDown.stop:
105 break
106 yield self.get_next_event()
107
108 def ReceivePackets(self, request, context):
109 pass
110
111 def SendPackets(self, request, context):
112 pass
113
114
115class OpenFlow(OpenFlowServicer):
116
117 def EchoRequest(self, request, context):
118 pass
119
120 def SendPacketsOutMessages(self, request, context):
121 pass
122
123 def ReceivePacketInMessages(self, request, context):
124 pass
125
126
127if __name__ == '__main__':
128 thread_pool = futures.ThreadPoolExecutor(max_workers=10)
129 server = grpc.server(thread_pool)
130 add_ExperimentalServiceServicer_to_server(Service(), server)
131 add_OpenFlowServicer_to_server(OpenFlow(), server)
132 server.add_insecure_port('[::]:50050')
133 server.start()
134 def shutdown():
135 ShutDown.stop = True
136 thread_pool.shutdown(wait=True)
137 server.stop(0)
138 reactor.addSystemEventTrigger('before', 'shutdown', shutdown)
139 reactor.run()