blob: 95e6f5bff0318fa8ef0504d56bd1be15c80bbbe6 [file] [log] [blame]
khenb95fe9a2016-10-05 11:15:25 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
khenb95fe9a2016-10-05 11:15:25 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Some consul related convenience functions
19"""
20
khenb95fe9a2016-10-05 11:15:25 -070021from structlog import get_logger
khenb95fe9a2016-10-05 11:15:25 -070022from consul import Consul
23from random import randint
24
25log = get_logger()
26
Khen Nursimulu441dedd2016-10-05 14:44:26 -070027
alshabib16c0da72017-01-19 12:26:02 -060028def connect_to_consult(consul_endpoint):
29 log.debug('getting-service-endpoint', consul=consul_endpoint)
Khen Nursimulu90fc35d2017-01-09 08:42:04 -050030
khenb95fe9a2016-10-05 11:15:25 -070031 host = consul_endpoint.split(':')[0].strip()
32 port = int(consul_endpoint.split(':')[1].strip())
33
alshabib16c0da72017-01-19 12:26:02 -060034 return Consul(host=host, port=port)
35
36
37def verify_all_services_healthy(consul_endpoint, service_name=None,
38 number_of_expected_services=None):
39 """
40 Verify in consul if any service is healthy
41 :param consul_endpoint: a <host>:<port> string
42 :param service_name: name of service to check, optional
43 :param number_of_expected_services number of services to check for, optional
44 :return: true if healthy, false otherwise
45 """
46
47 def check_health(service):
48 _, serv_health = consul.health.service(service, passing=True)
49 return not serv_health == []
50
51 consul = connect_to_consult(consul_endpoint)
52
53 if service_name is not None:
54 return check_health(service_name)
55
56 services = get_all_services(consul_endpoint)
57
58 items = services.keys()
59
60 if number_of_expected_services is not None and \
61 len(items) != number_of_expected_services:
62 return False
63
64 for item in items:
65 if not check_health(item):
66 return False
67
68 return True
69
70
71def get_all_services(consul_endpoint):
72
73 log.debug('getting-service-verify-health')
74
75 consul = connect_to_consult(consul_endpoint)
76 _, services = consul.catalog.services()
77
78 return services
79
80
81def get_endpoint_from_consul(consul_endpoint, service_name):
82 """Look up, from consul, the service name specified by service-name
83 """
84 log.debug('getting-service-info', service=service_name)
85
86 consul = connect_to_consult(consul_endpoint)
khenb95fe9a2016-10-05 11:15:25 -070087 _, services = consul.catalog.service(service_name)
88
89 if len(services) == 0:
Khen Nursimulu441dedd2016-10-05 14:44:26 -070090 raise Exception(
91 'Cannot find service {} in consul'.format(service_name))
alshabibb634d902017-01-16 13:10:17 -060092 os.exit(1)
khenb95fe9a2016-10-05 11:15:25 -070093
alshabib37494a72017-01-26 11:59:52 -080094 # pick local addresses when resolving a service via consul
95 # see CORD-818 (https://jira.opencord.org/browse/CORD-818)
khenb95fe9a2016-10-05 11:15:25 -070096
97 service = services[randint(0, len(services) - 1)]
98 endpoint = '{}:{}'.format(service['ServiceAddress'],
99 service['ServicePort'])
100
khenb95fe9a2016-10-05 11:15:25 -0700101 return endpoint
102
103
104if __name__ == '__main__':
Khen Nursimulu9b9f1ad2017-01-10 15:43:32 -0500105 print get_endpoint_from_consul('10.100.198.220:8500', 'kafka')