blob: 044852fade8d42e2605db13d94ab90a1ad136835 [file] [log] [blame]
Wei-Yu Chen49950b92021-11-08 19:19:18 +08001"""
2Copyright 2020 The Magma Authors.
3
4This source code is licensed under the BSD-style license found in the
5LICENSE file in the root directory of this source tree.
6
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License.
12"""
13
14import logging
15import os
16
17import grpc
18from configuration.exceptions import LoadConfigError
19from configuration.service_configs import load_service_config
20
21GRPC_KEEPALIVE_MS = 30 * 1000
22
23
24class ServiceRegistry:
25 """
26 ServiceRegistry provides the framework to discover services.
27
28 ServiceRegistry takes care of service naming, and sets the connection
29 params like ip/port, TLS, certs, etc based on service level configuration.
30 """
31
32 _REGISTRY = {}
33 _PROXY_CONFIG = {}
34 _CHANNELS_CACHE = {}
35
36 LOCAL = 'local'
37 CLOUD = 'cloud'
38
39 @staticmethod
40 def get_service_address(service):
41 """
42 Returns the (host, port) tuple for the service.
43
44 Args:
45 service (string): Name of the service
46 Returns:
47 (host, port) tuple
48 Raises:
49 ValueError if the service is unknown
50 """
51 registry = ServiceRegistry.get_registry()
52 if service not in registry["services"]:
53 raise ValueError("Invalid service name: %s" % service)
54 service_conf = registry["services"][service]
55 return service_conf["ip_address"], service_conf["port"]
56
57 @staticmethod
58 def add_service(name, ip_address, port):
59 """
60 Adds a service to the registry.
61
62 Args:
63 name (string): Service name
64 ip_address (string): ip address string
65 port (int): service port
66 """
67 registry = ServiceRegistry.get_registry()
68 service = {"ip_address": ip_address, "port": port}
69 registry["services"][name] = service
70
71 @staticmethod
72 def list_services():
73 """
74 Returns the list of services in the registry.
75
76 Returns:
77 list of services
78 """
79 return ServiceRegistry.get_registry()["services"]
80
81 @staticmethod
82 def reset():
83 """
84 Removes all the entries in the registry
85 """
86 ServiceRegistry.get_registry()["services"] = {}
87
88 @staticmethod
89 def get_bootstrap_rpc_channel():
90 """
91 Returns a RPC channel to the bootstrap service in CLOUD.
92 Returns:
93 grpc channel
94 """
95 proxy_config = ServiceRegistry.get_proxy_config()
96 (ip, port) = (
97 proxy_config['bootstrap_address'],
98 proxy_config['bootstrap_port'],
99 )
100 authority = proxy_config['bootstrap_address']
101
102 try:
103 rootca = open(proxy_config['rootca_cert'], 'rb').read()
104 except FileNotFoundError as exp:
105 raise ValueError("SSL cert not found: %s" % exp)
106
107 ssl_creds = grpc.ssl_channel_credentials(rootca)
108 return create_grpc_channel(ip, port, authority, ssl_creds)
109
110 @staticmethod
111 def get_rpc_channel(
112 service, destination, proxy_cloud_connections=True,
113 grpc_options=None,
114 ):
115 """
116 Returns a RPC channel to the service. The connection params
117 are obtained from the service registry and used.
118 TBD: pool connections to a service and reuse them. Right
119 now each call creates a new TCP/SSL/HTTP2 connection.
120
121 Args:
122 service (string): Name of the service
123 destination (string): ServiceRegistry.LOCAL or ServiceRegistry.CLOUD
124 proxy_cloud_connections (bool): Override to connect direct to cloud
125 grpc_options (list): list of gRPC options params for the channel
126 Returns:
127 grpc channel
128 Raises:
129 ValueError if the service is unknown
130 """
131 proxy_config = ServiceRegistry.get_proxy_config()
132
133 # Control proxy uses the :authority: HTTP header to route to services.
134 if destination == ServiceRegistry.LOCAL:
135 authority = '%s.local' % (service)
136 else:
137 authority = '%s-%s' % (service, proxy_config['cloud_address'])
138
139 should_use_proxy = proxy_config['proxy_cloud_connections'] and \
140 proxy_cloud_connections
141
142 # If speaking to a local service or to the proxy, the grpc channel
143 # can be reused. If speaking to the cloud directly, the client cert
144 # could become stale after the next bootstrapper run.
145 should_reuse_channel = should_use_proxy or \
146 (destination == ServiceRegistry.LOCAL)
147 if should_reuse_channel:
148 channel = ServiceRegistry._CHANNELS_CACHE.get(authority, None)
149 if channel is not None:
150 return channel
151
152 if grpc_options is None:
153 grpc_options = [
154 ("grpc.keepalive_time_ms", GRPC_KEEPALIVE_MS),
155 ]
156 # We need to figure out the ip and port to connnect, if we need to use
157 # SSL and the authority to use.
158 if destination == ServiceRegistry.LOCAL:
159 # Connect to the local service directly
160 (ip, port) = ServiceRegistry.get_service_address(service)
161 channel = create_grpc_channel(
162 ip, port, authority,
163 options=grpc_options,
164 )
165 elif should_use_proxy:
166 # Connect to the cloud via local control proxy
167 try:
168 (ip, unused_port) = ServiceRegistry.get_service_address(
169 "control_proxy",
170 )
171 port = proxy_config['local_port']
172 except ValueError as err:
173 logging.error(err)
174 (ip, port) = ('127.0.0.1', proxy_config['local_port'])
175 channel = create_grpc_channel(
176 ip, port, authority,
177 options=grpc_options,
178 )
179 else:
180 # Connect to the cloud directly
181 ip = proxy_config['cloud_address']
182 port = proxy_config['cloud_port']
183 ssl_creds = get_ssl_creds()
184 channel = create_grpc_channel(
185 ip, port, authority, ssl_creds,
186 options=grpc_options,
187 )
188 if should_reuse_channel:
189 ServiceRegistry._CHANNELS_CACHE[authority] = channel
190 return channel
191
192 @staticmethod
193 def get_registry():
194 """
195 Returns _REGISTRY which holds the contents from the
196 config/service/service_registry.yml file. Its a static member and the
197 .yml file is loaded only once.
198 """
199 if not ServiceRegistry._REGISTRY:
200 try:
201 ServiceRegistry._REGISTRY = load_service_config(
202 "service_registry",
203 )
204 except LoadConfigError as err:
205 logging.error(err)
206 ServiceRegistry._REGISTRY = {"services": {}}
207 return ServiceRegistry._REGISTRY
208
209 @staticmethod
210 def get_proxy_config():
211 """
212 Returns the control proxy config. The config file is loaded only
213 once and cached.
214 """
215 if not ServiceRegistry._PROXY_CONFIG:
216 try:
217 ServiceRegistry._PROXY_CONFIG = load_service_config(
218 'control_proxy',
219 )
220 except LoadConfigError as err:
221 logging.error(err)
222 ServiceRegistry._PROXY_CONFIG = {
223 'proxy_cloud_connections': True,
224 }
225 return ServiceRegistry._PROXY_CONFIG
226
227
228def set_grpc_cipher_suites():
229 """
230 Set the cipher suites to be used for the gRPC TLS connection.
231 TODO (praveenr) t19265877: Update nghttpx in the cloud to recent version
232 and delete this. The current nghttpx version doesn't support the
233 ciphers needed by default for gRPC.
234 """
235 os.environ["GRPC_SSL_CIPHER_SUITES"] = "ECDHE-ECDSA-AES256-GCM-SHA384:"\
236 "ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:"\
237 "ECDHE-RSA-CHACHA20-POLY1305:ECDHE-ECDSA-AES128-GCM-SHA256:"\
238 "ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-SHA384:"\
239 "ECDHE-RSA-AES256-SHA384:ECDHE-ECDSA-AES128-SHA256:"\
240 "ECDHE-RSA-AES128-SHA256"
241
242
243def get_ssl_creds():
244 """
245 Get the SSL credentials to use to communicate securely.
246 We use client side TLS auth, with the cert and keys
247 obtained during bootstrapping of the gateway.
248
249 Returns:
250 gRPC ssl creds
251 Raises:
252 ValueError if the cert or key filename in the
253 control proxy config is incorrect.
254 """
255 proxy_config = ServiceRegistry.get_proxy_config()
256 try:
257 with open(proxy_config['rootca_cert'], 'rb') as rootca_f:
258 with open(proxy_config['gateway_cert'], encoding="utf-8") as cert_f:
259 with open(proxy_config['gateway_key'], encoding="utf-8") as key_f:
260 rootca = rootca_f.read()
261 cert = cert_f.read().encode()
262 key = key_f.read().encode()
263 ssl_creds = grpc.ssl_channel_credentials(
264 root_certificates=rootca,
265 certificate_chain=cert,
266 private_key=key,
267 )
268 except FileNotFoundError as exp:
269 raise ValueError("SSL cert not found: %s" % exp)
270 return ssl_creds
271
272
273def create_grpc_channel(ip, port, authority, ssl_creds=None, options=None):
274 """
275 Helper function to create a grpc channel.
276
277 Args:
278 ip: IP address of the remote endpoint
279 port: port of the remote endpoint
280 authority: HTTP header that control proxy uses for routing
281 ssl_creds: Enables SSL
282 options: configuration options for gRPC channel
283 Returns:
284 grpc channel
285 """
286 grpc_options = [('grpc.default_authority', authority)]
287 if options is not None:
288 grpc_options.extend(options)
289 if ssl_creds is not None:
290 set_grpc_cipher_suites()
291 channel = grpc.secure_channel(
292 target='%s:%s' % (ip, port),
293 credentials=ssl_creds,
294 options=grpc_options,
295 )
296 else:
297 channel = grpc.insecure_channel(
298 target='%s:%s' % (ip, port),
299 options=grpc_options,
300 )
301 return channel