blob: 7eeac38bab40fb2607906e97576555d4523b471a [file] [log] [blame]
khenb95fe9a2016-10-05 11:15:25 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
khenb95fe9a2016-10-05 11:15:25 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Some consul related convenience functions
19"""
20
khenb95fe9a2016-10-05 11:15:25 -070021from structlog import get_logger
khenb95fe9a2016-10-05 11:15:25 -070022from consul import Consul
23from random import randint
ggowdru85150c22017-04-26 07:24:40 -070024from common.utils.nethelpers import get_my_primary_local_ipv4
khenb95fe9a2016-10-05 11:15:25 -070025
26log = get_logger()
27
Khen Nursimulu441dedd2016-10-05 14:44:26 -070028
alshabib16c0da72017-01-19 12:26:02 -060029def connect_to_consult(consul_endpoint):
30 log.debug('getting-service-endpoint', consul=consul_endpoint)
Khen Nursimulu90fc35d2017-01-09 08:42:04 -050031
khenb95fe9a2016-10-05 11:15:25 -070032 host = consul_endpoint.split(':')[0].strip()
33 port = int(consul_endpoint.split(':')[1].strip())
34
alshabib16c0da72017-01-19 12:26:02 -060035 return Consul(host=host, port=port)
36
37
38def verify_all_services_healthy(consul_endpoint, service_name=None,
39 number_of_expected_services=None):
40 """
41 Verify in consul if any service is healthy
42 :param consul_endpoint: a <host>:<port> string
43 :param service_name: name of service to check, optional
44 :param number_of_expected_services number of services to check for, optional
45 :return: true if healthy, false otherwise
46 """
47
48 def check_health(service):
49 _, serv_health = consul.health.service(service, passing=True)
50 return not serv_health == []
51
52 consul = connect_to_consult(consul_endpoint)
53
54 if service_name is not None:
55 return check_health(service_name)
56
57 services = get_all_services(consul_endpoint)
58
59 items = services.keys()
60
61 if number_of_expected_services is not None and \
62 len(items) != number_of_expected_services:
63 return False
64
65 for item in items:
66 if not check_health(item):
67 return False
68
69 return True
70
71
72def get_all_services(consul_endpoint):
73
74 log.debug('getting-service-verify-health')
75
76 consul = connect_to_consult(consul_endpoint)
77 _, services = consul.catalog.services()
78
79 return services
80
alshabib16c0da72017-01-19 12:26:02 -060081def get_endpoint_from_consul(consul_endpoint, service_name):
ggowdru85150c22017-04-26 07:24:40 -070082 """
83 Get endpoint of service_name from consul.
84 :param consul_endpoint: a <host>:<port> string
85 :param service_name: name of service for which endpoint
86 needs to be found.
87 :return: service endpoint if available, else exit.
alshabib16c0da72017-01-19 12:26:02 -060088 """
89 log.debug('getting-service-info', service=service_name)
90
91 consul = connect_to_consult(consul_endpoint)
khenb95fe9a2016-10-05 11:15:25 -070092 _, services = consul.catalog.service(service_name)
93
94 if len(services) == 0:
Khen Nursimulu441dedd2016-10-05 14:44:26 -070095 raise Exception(
96 'Cannot find service {} in consul'.format(service_name))
alshabibb634d902017-01-16 13:10:17 -060097 os.exit(1)
khenb95fe9a2016-10-05 11:15:25 -070098
ggowdru85150c22017-04-26 07:24:40 -070099 """ Get host IPV4 address
100 """
101 local_ipv4 = get_my_primary_local_ipv4()
102 """ If host IP address from where the request came in matches
103 the IP address of the requested service's host IP address,
104 pick the endpoint
105 """
106 for i in range(len(services)):
107 service = services[i]
108 if service['ServiceAddress'] == local_ipv4:
109 log.debug("picking address locally")
110 endpoint = '{}:{}'.format(service['ServiceAddress'],
111 service['ServicePort'])
112 return endpoint
khenb95fe9a2016-10-05 11:15:25 -0700113
ggowdru85150c22017-04-26 07:24:40 -0700114 """ If service is not available locally, picak a random
115 endpoint for the service from the list
116 """
khenb95fe9a2016-10-05 11:15:25 -0700117 service = services[randint(0, len(services) - 1)]
118 endpoint = '{}:{}'.format(service['ServiceAddress'],
119 service['ServicePort'])
120
khenb95fe9a2016-10-05 11:15:25 -0700121 return endpoint
122
123
124if __name__ == '__main__':
Khen Nursimulu9b9f1ad2017-01-10 15:43:32 -0500125 print get_endpoint_from_consul('10.100.198.220:8500', 'kafka')