blob: 2df70fd3b0d5c97532f2666d17f10b0852c0059b [file] [log] [blame]
Wei-Yu Chenad55cb82022-02-15 20:07:01 +08001# SPDX-FileCopyrightText: 2020 The Magma Authors.
2# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
3#
4# SPDX-License-Identifier: BSD-3-Clause
Wei-Yu Chen49950b92021-11-08 19:19:18 +08005
6import logging
7import os
8
9import grpc
10from configuration.exceptions import LoadConfigError
11from configuration.service_configs import load_service_config
12
13GRPC_KEEPALIVE_MS = 30 * 1000
14
15
16class ServiceRegistry:
17 """
18 ServiceRegistry provides the framework to discover services.
19
20 ServiceRegistry takes care of service naming, and sets the connection
21 params like ip/port, TLS, certs, etc based on service level configuration.
22 """
23
24 _REGISTRY = {}
25 _PROXY_CONFIG = {}
26 _CHANNELS_CACHE = {}
27
28 LOCAL = 'local'
29 CLOUD = 'cloud'
30
31 @staticmethod
32 def get_service_address(service):
33 """
34 Returns the (host, port) tuple for the service.
35
36 Args:
37 service (string): Name of the service
38 Returns:
39 (host, port) tuple
40 Raises:
41 ValueError if the service is unknown
42 """
43 registry = ServiceRegistry.get_registry()
44 if service not in registry["services"]:
45 raise ValueError("Invalid service name: %s" % service)
46 service_conf = registry["services"][service]
47 return service_conf["ip_address"], service_conf["port"]
48
49 @staticmethod
50 def add_service(name, ip_address, port):
51 """
52 Adds a service to the registry.
53
54 Args:
55 name (string): Service name
56 ip_address (string): ip address string
57 port (int): service port
58 """
59 registry = ServiceRegistry.get_registry()
60 service = {"ip_address": ip_address, "port": port}
61 registry["services"][name] = service
62
63 @staticmethod
64 def list_services():
65 """
66 Returns the list of services in the registry.
67
68 Returns:
69 list of services
70 """
71 return ServiceRegistry.get_registry()["services"]
72
73 @staticmethod
74 def reset():
75 """
76 Removes all the entries in the registry
77 """
78 ServiceRegistry.get_registry()["services"] = {}
79
80 @staticmethod
81 def get_bootstrap_rpc_channel():
82 """
83 Returns a RPC channel to the bootstrap service in CLOUD.
84 Returns:
85 grpc channel
86 """
87 proxy_config = ServiceRegistry.get_proxy_config()
88 (ip, port) = (
89 proxy_config['bootstrap_address'],
90 proxy_config['bootstrap_port'],
91 )
92 authority = proxy_config['bootstrap_address']
93
94 try:
95 rootca = open(proxy_config['rootca_cert'], 'rb').read()
96 except FileNotFoundError as exp:
97 raise ValueError("SSL cert not found: %s" % exp)
98
99 ssl_creds = grpc.ssl_channel_credentials(rootca)
100 return create_grpc_channel(ip, port, authority, ssl_creds)
101
102 @staticmethod
103 def get_rpc_channel(
104 service, destination, proxy_cloud_connections=True,
105 grpc_options=None,
106 ):
107 """
108 Returns a RPC channel to the service. The connection params
109 are obtained from the service registry and used.
110 TBD: pool connections to a service and reuse them. Right
111 now each call creates a new TCP/SSL/HTTP2 connection.
112
113 Args:
114 service (string): Name of the service
115 destination (string): ServiceRegistry.LOCAL or ServiceRegistry.CLOUD
116 proxy_cloud_connections (bool): Override to connect direct to cloud
117 grpc_options (list): list of gRPC options params for the channel
118 Returns:
119 grpc channel
120 Raises:
121 ValueError if the service is unknown
122 """
123 proxy_config = ServiceRegistry.get_proxy_config()
124
125 # Control proxy uses the :authority: HTTP header to route to services.
126 if destination == ServiceRegistry.LOCAL:
127 authority = '%s.local' % (service)
128 else:
129 authority = '%s-%s' % (service, proxy_config['cloud_address'])
130
131 should_use_proxy = proxy_config['proxy_cloud_connections'] and \
132 proxy_cloud_connections
133
134 # If speaking to a local service or to the proxy, the grpc channel
135 # can be reused. If speaking to the cloud directly, the client cert
136 # could become stale after the next bootstrapper run.
137 should_reuse_channel = should_use_proxy or \
138 (destination == ServiceRegistry.LOCAL)
139 if should_reuse_channel:
140 channel = ServiceRegistry._CHANNELS_CACHE.get(authority, None)
141 if channel is not None:
142 return channel
143
144 if grpc_options is None:
145 grpc_options = [
146 ("grpc.keepalive_time_ms", GRPC_KEEPALIVE_MS),
147 ]
148 # We need to figure out the ip and port to connnect, if we need to use
149 # SSL and the authority to use.
150 if destination == ServiceRegistry.LOCAL:
151 # Connect to the local service directly
152 (ip, port) = ServiceRegistry.get_service_address(service)
153 channel = create_grpc_channel(
154 ip, port, authority,
155 options=grpc_options,
156 )
157 elif should_use_proxy:
158 # Connect to the cloud via local control proxy
159 try:
160 (ip, unused_port) = ServiceRegistry.get_service_address(
161 "control_proxy",
162 )
163 port = proxy_config['local_port']
164 except ValueError as err:
165 logging.error(err)
166 (ip, port) = ('127.0.0.1', proxy_config['local_port'])
167 channel = create_grpc_channel(
168 ip, port, authority,
169 options=grpc_options,
170 )
171 else:
172 # Connect to the cloud directly
173 ip = proxy_config['cloud_address']
174 port = proxy_config['cloud_port']
175 ssl_creds = get_ssl_creds()
176 channel = create_grpc_channel(
177 ip, port, authority, ssl_creds,
178 options=grpc_options,
179 )
180 if should_reuse_channel:
181 ServiceRegistry._CHANNELS_CACHE[authority] = channel
182 return channel
183
184 @staticmethod
185 def get_registry():
186 """
187 Returns _REGISTRY which holds the contents from the
188 config/service/service_registry.yml file. Its a static member and the
189 .yml file is loaded only once.
190 """
191 if not ServiceRegistry._REGISTRY:
192 try:
193 ServiceRegistry._REGISTRY = load_service_config(
194 "service_registry",
195 )
196 except LoadConfigError as err:
197 logging.error(err)
198 ServiceRegistry._REGISTRY = {"services": {}}
199 return ServiceRegistry._REGISTRY
200
201 @staticmethod
202 def get_proxy_config():
203 """
204 Returns the control proxy config. The config file is loaded only
205 once and cached.
206 """
207 if not ServiceRegistry._PROXY_CONFIG:
208 try:
209 ServiceRegistry._PROXY_CONFIG = load_service_config(
210 'control_proxy',
211 )
212 except LoadConfigError as err:
213 logging.error(err)
214 ServiceRegistry._PROXY_CONFIG = {
215 'proxy_cloud_connections': True,
216 }
217 return ServiceRegistry._PROXY_CONFIG
218
219
220def set_grpc_cipher_suites():
221 """
222 Set the cipher suites to be used for the gRPC TLS connection.
223 TODO (praveenr) t19265877: Update nghttpx in the cloud to recent version
224 and delete this. The current nghttpx version doesn't support the
225 ciphers needed by default for gRPC.
226 """
227 os.environ["GRPC_SSL_CIPHER_SUITES"] = "ECDHE-ECDSA-AES256-GCM-SHA384:"\
228 "ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:"\
229 "ECDHE-RSA-CHACHA20-POLY1305:ECDHE-ECDSA-AES128-GCM-SHA256:"\
230 "ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-SHA384:"\
231 "ECDHE-RSA-AES256-SHA384:ECDHE-ECDSA-AES128-SHA256:"\
232 "ECDHE-RSA-AES128-SHA256"
233
234
235def get_ssl_creds():
236 """
237 Get the SSL credentials to use to communicate securely.
238 We use client side TLS auth, with the cert and keys
239 obtained during bootstrapping of the gateway.
240
241 Returns:
242 gRPC ssl creds
243 Raises:
244 ValueError if the cert or key filename in the
245 control proxy config is incorrect.
246 """
247 proxy_config = ServiceRegistry.get_proxy_config()
248 try:
249 with open(proxy_config['rootca_cert'], 'rb') as rootca_f:
250 with open(proxy_config['gateway_cert'], encoding="utf-8") as cert_f:
251 with open(proxy_config['gateway_key'], encoding="utf-8") as key_f:
252 rootca = rootca_f.read()
253 cert = cert_f.read().encode()
254 key = key_f.read().encode()
255 ssl_creds = grpc.ssl_channel_credentials(
256 root_certificates=rootca,
257 certificate_chain=cert,
258 private_key=key,
259 )
260 except FileNotFoundError as exp:
261 raise ValueError("SSL cert not found: %s" % exp)
262 return ssl_creds
263
264
265def create_grpc_channel(ip, port, authority, ssl_creds=None, options=None):
266 """
267 Helper function to create a grpc channel.
268
269 Args:
270 ip: IP address of the remote endpoint
271 port: port of the remote endpoint
272 authority: HTTP header that control proxy uses for routing
273 ssl_creds: Enables SSL
274 options: configuration options for gRPC channel
275 Returns:
276 grpc channel
277 """
278 grpc_options = [('grpc.default_authority', authority)]
279 if options is not None:
280 grpc_options.extend(options)
281 if ssl_creds is not None:
282 set_grpc_cipher_suites()
283 channel = grpc.secure_channel(
284 target='%s:%s' % (ip, port),
285 credentials=ssl_creds,
286 options=grpc_options,
287 )
288 else:
289 channel = grpc.insecure_channel(
290 target='%s:%s' % (ip, port),
291 options=grpc_options,
292 )
293 return channel