blob: 8df630e6f511f598141d3daf29d0138889e66383 [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Utilities to handle gRPC server and client side code in a Twisted environment
19"""
20import structlog
21from concurrent.futures import Future
22from twisted.internet import reactor
23from twisted.internet.defer import Deferred
24from twisted.python.threadable import isInIOThread
25
26
27log = structlog.get_logger()
28
29
30def twisted_async(func):
31 """
32 This decorator can be used to implement a gRPC method on the twisted
33 thread, allowing asynchronous programming in Twisted while serving
34 a gRPC call.
35
36 gRPC methods normally are called on the futures.ThreadPool threads,
37 so these methods cannot directly use Twisted protocol constructs.
38 If the implementation of the methods needs to touch Twisted, it is
39 safer (or mandatory) to wrap the method with this decorator, which will
40 call the inner method from the external thread and ensure that the
41 result is passed back to the foreign thread.
42
43 Example usage:
44
45 When implementing a gRPC server, typical pattern is:
46
47 class SpamService(SpamServicer):
48
49 def GetBadSpam(self, request, context):
50 '''this is called from a ThreadPoolExecutor thread'''
51 # generally unsafe to make Twisted calls
52
53 @twisted_async
54 def GetSpamSafely(self, request, context):
55 '''this method now is executed on the Twisted main thread
56 # safe to call any Twisted protocol functions
57
58 @twisted_async
59 @inlineCallbacks
60 def GetAsyncSpam(self, request, context):
61 '''this generator can use inlineCallbacks Twisted style'''
62 result = yield some_async_twisted_call(request)
63 returnValue(result)
64
65 """
66 def in_thread_wrapper(*args, **kw):
67
68 if isInIOThread():
69
70 return func(*args, **kw)
71
72 f = Future()
73
74 def twisted_wrapper():
75 try:
76 d = func(*args, **kw)
77 if isinstance(d, Deferred):
78
79 def _done(result):
80 f.set_result(result)
81 f.done()
82
83 def _error(e):
84 f.set_exception(e)
85 f.done()
86
87 d.addCallback(_done)
88 d.addErrback(_error)
89
90 else:
91 f.set_result(d)
92 f.done()
93
94 except Exception, e:
95 f.set_exception(e)
96 f.done()
97
98 reactor.callFromThread(twisted_wrapper)
99 try:
100 result = f.result()
101 except Exception, e:
102 log.exception(e=e, func=func, args=args, kw=kw)
103 raise
104
105 return result
106
107 return in_thread_wrapper
108
109