blob: df4dd58b16c109d0186cb1d9a67a4a92fb2b90f8 [file] [log] [blame]
khenb95fe9a2016-10-05 11:15:25 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
khenb95fe9a2016-10-05 11:15:25 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Some consul related convenience functions
19"""
20
khenb95fe9a2016-10-05 11:15:25 -070021from structlog import get_logger
khenb95fe9a2016-10-05 11:15:25 -070022from consul import Consul
23from random import randint
ggowdru85150c22017-04-26 07:24:40 -070024from common.utils.nethelpers import get_my_primary_local_ipv4
khenb95fe9a2016-10-05 11:15:25 -070025
26log = get_logger()
27
Khen Nursimulu441dedd2016-10-05 14:44:26 -070028
alshabib16c0da72017-01-19 12:26:02 -060029def connect_to_consult(consul_endpoint):
30 log.debug('getting-service-endpoint', consul=consul_endpoint)
Khen Nursimulu90fc35d2017-01-09 08:42:04 -050031
khenb95fe9a2016-10-05 11:15:25 -070032 host = consul_endpoint.split(':')[0].strip()
33 port = int(consul_endpoint.split(':')[1].strip())
34
alshabib16c0da72017-01-19 12:26:02 -060035 return Consul(host=host, port=port)
36
37
38def verify_all_services_healthy(consul_endpoint, service_name=None,
39 number_of_expected_services=None):
40 """
41 Verify in consul if any service is healthy
42 :param consul_endpoint: a <host>:<port> string
43 :param service_name: name of service to check, optional
44 :param number_of_expected_services number of services to check for, optional
45 :return: true if healthy, false otherwise
46 """
47
48 def check_health(service):
49 _, serv_health = consul.health.service(service, passing=True)
50 return not serv_health == []
51
52 consul = connect_to_consult(consul_endpoint)
53
54 if service_name is not None:
55 return check_health(service_name)
56
57 services = get_all_services(consul_endpoint)
58
59 items = services.keys()
60
61 if number_of_expected_services is not None and \
khenaidoo032d3302017-06-09 14:50:04 -040062 len(items) != number_of_expected_services:
alshabib16c0da72017-01-19 12:26:02 -060063 return False
64
65 for item in items:
66 if not check_health(item):
67 return False
68
69 return True
70
71
72def get_all_services(consul_endpoint):
alshabib16c0da72017-01-19 12:26:02 -060073 log.debug('getting-service-verify-health')
74
75 consul = connect_to_consult(consul_endpoint)
76 _, services = consul.catalog.services()
77
78 return services
79
khenaidoo032d3302017-06-09 14:50:04 -040080
81def get_all_instances_of_service(consul_endpoint, service_name):
82 log.debug('getting-all-instances-of-service', service=service_name)
83
84 consul = connect_to_consult(consul_endpoint)
85 _, services = consul.catalog.service(service_name)
86
87 for service in services:
88 log.debug('service',
89 name=service['ServiceName'],
90 serviceid=service['ServiceID'],
91 serviceport=service['ServicePort'],
92 createindex=service['CreateIndex'])
93
94 return services
95
96
alshabib16c0da72017-01-19 12:26:02 -060097def get_endpoint_from_consul(consul_endpoint, service_name):
ggowdru85150c22017-04-26 07:24:40 -070098 """
99 Get endpoint of service_name from consul.
100 :param consul_endpoint: a <host>:<port> string
101 :param service_name: name of service for which endpoint
102 needs to be found.
103 :return: service endpoint if available, else exit.
alshabib16c0da72017-01-19 12:26:02 -0600104 """
105 log.debug('getting-service-info', service=service_name)
106
107 consul = connect_to_consult(consul_endpoint)
khenb95fe9a2016-10-05 11:15:25 -0700108 _, services = consul.catalog.service(service_name)
109
110 if len(services) == 0:
Khen Nursimulu441dedd2016-10-05 14:44:26 -0700111 raise Exception(
112 'Cannot find service {} in consul'.format(service_name))
alshabibb634d902017-01-16 13:10:17 -0600113 os.exit(1)
khenb95fe9a2016-10-05 11:15:25 -0700114
ggowdru85150c22017-04-26 07:24:40 -0700115 """ Get host IPV4 address
116 """
117 local_ipv4 = get_my_primary_local_ipv4()
118 """ If host IP address from where the request came in matches
119 the IP address of the requested service's host IP address,
120 pick the endpoint
121 """
122 for i in range(len(services)):
123 service = services[i]
124 if service['ServiceAddress'] == local_ipv4:
125 log.debug("picking address locally")
126 endpoint = '{}:{}'.format(service['ServiceAddress'],
khenaidoo032d3302017-06-09 14:50:04 -0400127 service['ServicePort'])
ggowdru85150c22017-04-26 07:24:40 -0700128 return endpoint
khenb95fe9a2016-10-05 11:15:25 -0700129
ggowdru85150c22017-04-26 07:24:40 -0700130 """ If service is not available locally, picak a random
131 endpoint for the service from the list
132 """
khenb95fe9a2016-10-05 11:15:25 -0700133 service = services[randint(0, len(services) - 1)]
134 endpoint = '{}:{}'.format(service['ServiceAddress'],
135 service['ServicePort'])
136
khenb95fe9a2016-10-05 11:15:25 -0700137 return endpoint
138
139
khenaidoo032d3302017-06-09 14:50:04 -0400140def get_healthy_instances(consul_endpoint, service_name=None,
141 number_of_expected_services=None):
142 """
143 Verify in consul if any service is healthy
144 :param consul_endpoint: a <host>:<port> string
145 :param service_name: name of service to check, optional
146 :param number_of_expected_services number of services to check for, optional
147 :return: true if healthy, false otherwise
148 """
149
150 def check_health(service):
151 _, serv_health = consul.health.service(service, passing=True)
152 return not serv_health == []
153
154 consul = connect_to_consult(consul_endpoint)
155
156 if service_name is not None:
157 return check_health(service_name)
158
159 services = get_all_services(consul_endpoint)
160
161 items = services.keys()
162
163 if number_of_expected_services is not None and \
164 len(items) != number_of_expected_services:
165 return False
166
167 for item in items:
168 if not check_health(item):
169 return False
170
171 return True
172
173
khenb95fe9a2016-10-05 11:15:25 -0700174if __name__ == '__main__':
khenaidoo032d3302017-06-09 14:50:04 -0400175 # print get_endpoint_from_consul('10.100.198.220:8500', 'kafka')
176 # print get_healthy_instances('10.100.198.220:8500', 'voltha-health')
177 # print get_healthy_instances('10.100.198.220:8500')
178 get_all_instances_of_service('10.100.198.220:8500', 'voltha-grpc')