blob: 8df630e6f511f598141d3daf29d0138889e66383 [file] [log] [blame]
Zsolt Haraszticd22adc2016-10-25 00:13:06 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszticd22adc2016-10-25 00:13:06 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Utilities to handle gRPC server and client side code in a Twisted environment
19"""
Zsolt Haraszti66862032016-11-28 14:28:39 -080020import structlog
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070021from concurrent.futures import Future
22from twisted.internet import reactor
23from twisted.internet.defer import Deferred
Zsolt Haraszti00d9a842016-11-23 11:18:23 -080024from twisted.python.threadable import isInIOThread
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070025
26
Zsolt Haraszti66862032016-11-28 14:28:39 -080027log = structlog.get_logger()
28
29
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070030def twisted_async(func):
31 """
32 This decorator can be used to implement a gRPC method on the twisted
33 thread, allowing asynchronous programming in Twisted while serving
34 a gRPC call.
35
36 gRPC methods normally are called on the futures.ThreadPool threads,
37 so these methods cannot directly use Twisted protocol constructs.
38 If the implementation of the methods needs to touch Twisted, it is
39 safer (or mandatory) to wrap the method with this decorator, which will
40 call the inner method from the external thread and ensure that the
41 result is passed back to the foreign thread.
42
43 Example usage:
44
45 When implementing a gRPC server, typical pattern is:
46
47 class SpamService(SpamServicer):
48
49 def GetBadSpam(self, request, context):
50 '''this is called from a ThreadPoolExecutor thread'''
51 # generally unsafe to make Twisted calls
52
53 @twisted_async
54 def GetSpamSafely(self, request, context):
55 '''this method now is executed on the Twisted main thread
56 # safe to call any Twisted protocol functions
57
58 @twisted_async
59 @inlineCallbacks
60 def GetAsyncSpam(self, request, context):
61 '''this generator can use inlineCallbacks Twisted style'''
62 result = yield some_async_twisted_call(request)
63 returnValue(result)
64
65 """
66 def in_thread_wrapper(*args, **kw):
67
Zsolt Haraszti00d9a842016-11-23 11:18:23 -080068 if isInIOThread():
69
70 return func(*args, **kw)
71
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070072 f = Future()
73
74 def twisted_wrapper():
75 try:
76 d = func(*args, **kw)
77 if isinstance(d, Deferred):
78
79 def _done(result):
80 f.set_result(result)
81 f.done()
82
83 def _error(e):
84 f.set_exception(e)
85 f.done()
86
87 d.addCallback(_done)
88 d.addErrback(_error)
89
90 else:
91 f.set_result(d)
92 f.done()
93
94 except Exception, e:
95 f.set_exception(e)
96 f.done()
97
98 reactor.callFromThread(twisted_wrapper)
Zsolt Haraszti66862032016-11-28 14:28:39 -080099 try:
100 result = f.result()
101 except Exception, e:
102 log.exception(e=e, func=func, args=args, kw=kw)
103 raise
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700104
105 return result
106
107 return in_thread_wrapper
108
109