blob: df4dd58b16c109d0186cb1d9a67a4a92fb2b90f8 [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Some consul related convenience functions
19"""
20
21from structlog import get_logger
22from consul import Consul
23from random import randint
24from common.utils.nethelpers import get_my_primary_local_ipv4
25
26log = get_logger()
27
28
29def connect_to_consult(consul_endpoint):
30 log.debug('getting-service-endpoint', consul=consul_endpoint)
31
32 host = consul_endpoint.split(':')[0].strip()
33 port = int(consul_endpoint.split(':')[1].strip())
34
35 return Consul(host=host, port=port)
36
37
38def verify_all_services_healthy(consul_endpoint, service_name=None,
39 number_of_expected_services=None):
40 """
41 Verify in consul if any service is healthy
42 :param consul_endpoint: a <host>:<port> string
43 :param service_name: name of service to check, optional
44 :param number_of_expected_services number of services to check for, optional
45 :return: true if healthy, false otherwise
46 """
47
48 def check_health(service):
49 _, serv_health = consul.health.service(service, passing=True)
50 return not serv_health == []
51
52 consul = connect_to_consult(consul_endpoint)
53
54 if service_name is not None:
55 return check_health(service_name)
56
57 services = get_all_services(consul_endpoint)
58
59 items = services.keys()
60
61 if number_of_expected_services is not None and \
62 len(items) != number_of_expected_services:
63 return False
64
65 for item in items:
66 if not check_health(item):
67 return False
68
69 return True
70
71
72def get_all_services(consul_endpoint):
73 log.debug('getting-service-verify-health')
74
75 consul = connect_to_consult(consul_endpoint)
76 _, services = consul.catalog.services()
77
78 return services
79
80
81def get_all_instances_of_service(consul_endpoint, service_name):
82 log.debug('getting-all-instances-of-service', service=service_name)
83
84 consul = connect_to_consult(consul_endpoint)
85 _, services = consul.catalog.service(service_name)
86
87 for service in services:
88 log.debug('service',
89 name=service['ServiceName'],
90 serviceid=service['ServiceID'],
91 serviceport=service['ServicePort'],
92 createindex=service['CreateIndex'])
93
94 return services
95
96
97def get_endpoint_from_consul(consul_endpoint, service_name):
98 """
99 Get endpoint of service_name from consul.
100 :param consul_endpoint: a <host>:<port> string
101 :param service_name: name of service for which endpoint
102 needs to be found.
103 :return: service endpoint if available, else exit.
104 """
105 log.debug('getting-service-info', service=service_name)
106
107 consul = connect_to_consult(consul_endpoint)
108 _, services = consul.catalog.service(service_name)
109
110 if len(services) == 0:
111 raise Exception(
112 'Cannot find service {} in consul'.format(service_name))
113 os.exit(1)
114
115 """ Get host IPV4 address
116 """
117 local_ipv4 = get_my_primary_local_ipv4()
118 """ If host IP address from where the request came in matches
119 the IP address of the requested service's host IP address,
120 pick the endpoint
121 """
122 for i in range(len(services)):
123 service = services[i]
124 if service['ServiceAddress'] == local_ipv4:
125 log.debug("picking address locally")
126 endpoint = '{}:{}'.format(service['ServiceAddress'],
127 service['ServicePort'])
128 return endpoint
129
130 """ If service is not available locally, picak a random
131 endpoint for the service from the list
132 """
133 service = services[randint(0, len(services) - 1)]
134 endpoint = '{}:{}'.format(service['ServiceAddress'],
135 service['ServicePort'])
136
137 return endpoint
138
139
140def get_healthy_instances(consul_endpoint, service_name=None,
141 number_of_expected_services=None):
142 """
143 Verify in consul if any service is healthy
144 :param consul_endpoint: a <host>:<port> string
145 :param service_name: name of service to check, optional
146 :param number_of_expected_services number of services to check for, optional
147 :return: true if healthy, false otherwise
148 """
149
150 def check_health(service):
151 _, serv_health = consul.health.service(service, passing=True)
152 return not serv_health == []
153
154 consul = connect_to_consult(consul_endpoint)
155
156 if service_name is not None:
157 return check_health(service_name)
158
159 services = get_all_services(consul_endpoint)
160
161 items = services.keys()
162
163 if number_of_expected_services is not None and \
164 len(items) != number_of_expected_services:
165 return False
166
167 for item in items:
168 if not check_health(item):
169 return False
170
171 return True
172
173
174if __name__ == '__main__':
175 # print get_endpoint_from_consul('10.100.198.220:8500', 'kafka')
176 # print get_healthy_instances('10.100.198.220:8500', 'voltha-health')
177 # print get_healthy_instances('10.100.198.220:8500')
178 get_all_instances_of_service('10.100.198.220:8500', 'voltha-grpc')