blob: b75da2325e90de2cb4696db559c90bddcfefe761 [file] [log] [blame]
Khen Nursimulu37a9bf82016-10-16 20:11:31 -04001#!/usr/bin/env python
2#
3# Copyright 2016 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17import subprocess
18import time
19import logging
20import os
21import json
22from unittest import TestCase
23import re
24
25this_dir = os.path.abspath(os.path.dirname(__file__))
26
27from test_utils import run_command_to_completion_with_raw_stdout, \
28 is_open, \
29 is_valid_ip, \
30 run_long_running_command_with_timeout, \
31 run_command_to_completion_with_stdout_in_list
32
33log = logging.getLogger(__name__)
34
35LOCAL_CONSUL_URL = "http://localhost:8500"
36LOCAL_CONSUL_DNS = "@localhost -p 8600"
37DOCKER_COMPOSE_FILE = "compose/docker-compose-system-test.yml"
38DOCKER_COMPOSE_FILE_SERVICES_COUNT = 7
39
40command_defs = dict(
41 makefile_fetch_images="grep \"docker pull\" Makefile",
42 make="make",
43 make_fetch="make fetch",
44 remove_env_directory="rm -rf venv-linux",
45 make_clean="make clean",
46 docker_images="docker images",
47 docker_stop="docker stop",
48 docker_rm="docker rm",
49 fluentd_logs="less /tmp/fluentd/data.log",
50 docker_voltha_logs="docker logs -f compose_voltha_1",
51 docker_compose_logs="docker-compose -f {} logs".format(
52 DOCKER_COMPOSE_FILE),
53 docker_stop_and_remove_all_containers="docker stop `docker ps -q` ; "
54 "docker rm `docker ps -a -q`",
55 docker_start_voltha="docker run -ti --rm cord/voltha",
56 docker_start_voltha_with_consul_ip="docker run -ti --rm --net="
57 "compose_default cord/voltha "
58 "/voltha/main.py --consul=",
59 docker_get_consul_ip="docker inspect "
60 "compose_consul_1 | jq -r "
61 "'.[0].NetworkSettings.Networks."
62 "compose_default.IPAddress'",
63 docker_compose_start_consul="docker-compose -f {} up -d "
64 "consul".format(DOCKER_COMPOSE_FILE),
65 docker_compose_start_all="docker-compose -f {} up -d "
66 .format(DOCKER_COMPOSE_FILE),
67 docker_compose_stop="docker-compose -f {} stop"
68 .format(DOCKER_COMPOSE_FILE),
69 docker_compose_rm_f="docker-compose -f {} rm -f"
70 .format(DOCKER_COMPOSE_FILE),
71 docker_compose_ps="docker-compose -f {} ps".format(DOCKER_COMPOSE_FILE),
72 docker_ps="docker ps",
73 docker_ps_count="docker ps -q | wc -l",
74 docker_compose_is_consul_up="docker-compose -f {} ps | grep consul"
75 .format(DOCKER_COMPOSE_FILE),
76 consul_get_leader_ip_port="curl -s {}/v1/status/leader | jq -r ."
77 .format(LOCAL_CONSUL_URL),
78 docker_compose_services_running="docker-compose -f {} ps -q"
79 .format(DOCKER_COMPOSE_FILE),
80 docker_compose_services_running_count="docker-compose -f {} ps -q | "
81 "grep Up "
82 "| wc -l".format(
83 DOCKER_COMPOSE_FILE),
84 docker_compose_services="docker-compose -f {} config --services"
85 .format(DOCKER_COMPOSE_FILE),
86 consul_get_services="curl -s {}/v1/catalog/services | jq -r ."
87 .format(LOCAL_CONSUL_URL),
88 consul_get_srv_voltha_health="curl -s {}/v1/catalog/service/voltha-health "
89 "| jq -r .".format(LOCAL_CONSUL_URL),
90 kafka_client_run_10_secs="python kafka/kafka-consumer.py -r 10",
91 consul_get_voltha_rest_a_record="dig {} voltha-health.service.consul"
92 .format(LOCAL_CONSUL_DNS),
93 consul_get_voltha_rest_ip="dig {} +short voltha-health.service.consul"
94 .format(LOCAL_CONSUL_DNS),
95 consul_get_voltha_service_port="dig {} +short "
96 "voltha-health.service.consul SRV | "
97 " awk \'{{print $3}}'"
98 .format(LOCAL_CONSUL_DNS),
99 docker_compose_scale_voltha_to_10="docker-compose -f {} scale "
100 "voltha=10".format(DOCKER_COMPOSE_FILE),
101 docker_compose_scaled_voltha_ps="docker-compose -f {} ps voltha | "
102 "grep Up | wc -l"
103 .format(DOCKER_COMPOSE_FILE),
104 consul_verify_voltha_registration="curl -s {}"
105 "/v1/kv/service/voltha/members?recurse |"
106 " jq -r .".format(LOCAL_CONSUL_DNS)
107)
108
109
110class BuildMdTests(TestCase):
111 # docker_client = Client(base_url='unix://var/run/docker.sock')
112
113 def test_01_setup(self):
114 print "Test_01_setup_Start:------------------"
115 t0 = time.time()
116
117 try:
118 # remove the venv-linux directory
119 print "Remove venv-linux ..."
120 cmd = command_defs['remove_env_directory']
121 rm_venv, err, rc = run_command_to_completion_with_raw_stdout(cmd)
122 self.assertEqual(rc, 0)
123
124 # make clean
125 print "Make clean ..."
126 cmd = command_defs['make_clean']
127 mk_clean, err, rc = run_command_to_completion_with_raw_stdout(cmd)
128 self.assertEqual(rc, 0)
129
130 # source the env
131 print "Source environment ..."
132 self._source_env()
133
134 finally:
135 print "Test_01_setup_End:------------------ took {} " \
136 "secs\n\n".format(time.time() - t0)
137
138 def test_02_make_fetch(self):
139 print "Test_02_make_fetch_Start:------------------"
140 t0 = time.time()
141
142 try:
143 # Get list of images to fetch from the Makefile
144 print "Get list of images to fetch ..."
145 cmd = command_defs['makefile_fetch_images']
146 makefile_images_to_fetch, err, rc \
147 = run_command_to_completion_with_stdout_in_list(cmd)
148 self.assertEqual(rc, 0)
149
150 images_to_fetch = []
151 for image in makefile_images_to_fetch:
152 tmp = ''.join(image.split())
153 images_to_fetch.append(tmp[len('dockerpull'):])
154
155 # make fetch
156 print "Fetching images {} ...".format(images_to_fetch)
157 cmd = command_defs['make_fetch']
158 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
159 self.assertEqual(rc, 0)
160
161 # verify that the images have been downloaded
162 print "Verify images downloaded and present locally ..."
163 cmd = command_defs['docker_images']
164 local_images, err, rc = \
165 run_command_to_completion_with_stdout_in_list(cmd)
166 self.assertEqual(rc, 0)
167
168 local_images_list = []
169 for local_image in local_images:
170 words = local_image.split()
171 local_images_list.append('{}:{}'.format(words[0], words[1]))
172
173 intersection_list = [i for i in images_to_fetch if
174 i in local_images_list]
175 assert len(intersection_list) == len(images_to_fetch)
176
177 finally:
178 print "Test_02_make_fetch_End:------------------ took {} " \
179 "secs \n\n".format(time.time() - t0)
180
181 def test_03_make(self):
182 print "Test_03_make_Start:------------------"
183 t0 = time.time()
184 try:
185 cmd = command_defs['make']
186 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
187 self.assertEqual(rc, 0)
188 finally:
Khen Nursimulua54b6632016-10-18 18:01:25 -0400189 print "Test_03_make_Start:------------------ took {} secs \n\n" \
Khen Nursimulu37a9bf82016-10-16 20:11:31 -0400190 .format(time.time() - t0)
191
192 def test_04_run_voltha_standalone_without_consul(self):
193 print "Test_04_run_voltha_standalone_without_consul_Start:------------" \
194 "------"
195 t0 = time.time()
196
197 try:
198 # Run voltha for 10 secs and verity the following lines are displayed
199 # (a subset of output messages along with a flag when found)
200 print "Start voltha ..."
201 expected_output_subset = [
202 'main.print_banner {event: (to stop: press Ctrl-C), '
203 'instance_id:',
204 'coordinator.__init__ {event: initialized-coordinator,',
205 'grpc_server.run {event: starting-grpc-server,',
206 'main.<lambda> {event: twisted-reactor-started',
207 'main.startup_components {event: started-internal-services,',
208 'kafka_proxy.send_message {event: Sending message Heartbeat '
209 'message',
210 'coordinator._backoff {retry_in: 5, event: consul-not-up,'
211 ]
212
213 cmd = command_defs['docker_start_voltha']
214 command_output = run_long_running_command_with_timeout(cmd, 10)
215
216 # There should at least be 1 line in the output
217 self.assertGreater(len(command_output), 0)
218
219 # Verify that the output contained the expected_output_subset -
220 # save the docker instance id
221 print "Verify voltha started correctly ..."
222 instance_id = None
223 for ext_output in expected_output_subset:
224 match_str = next(
225 (out for out in command_output if ext_output in out),
226 None)
227 self.assertIsNotNone(match_str)
228 if "instance_id" in ext_output:
229 instance_id = re.findall(r'[0-9a-f]+', match_str)[-1]
230
231 # Now stop the voltha docker that was created
232 print "Stop voltha ..."
233 self._stop_docker_container_by_id(instance_id)
234
235
236 finally:
237 # Remove any created container
238 self._stop_and_remove_all_containers()
239
240 print "Test_04_run_voltha_standalone_without_consul_End" \
241 ":------------------ took {} secs \n\n".format(
242 time.time() - t0)
243
244 def test_05_run_consul_only(self):
245 print "Test_05_run_consul_only_Start:------------------ "
246 t0 = time.time()
247
248 try:
249 # run consul
250 print "Start consul ..."
251 self._run_consul()
252
Khen Nursimulu37a9bf82016-10-16 20:11:31 -0400253 print "Waiting for consul to be ready ..."
Khen Nursimulua54b6632016-10-18 18:01:25 -0400254 rc = self._wait_for_consul_to_be_ready()
255 self.assertEqual(rc, 0)
Khen Nursimulu37a9bf82016-10-16 20:11:31 -0400256
257 # Get the docker IP address and port number of the consul instance
258 print "Get consul leader IP ..."
259 cmd = command_defs['consul_get_leader_ip_port']
260 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
261 self.assertEqual(rc, 0)
262
263 # validate that the returned ip:port is valid and open
264 print "Verify consul IP and port is reachable ..."
265 self.assertTrue(is_open(out))
266
267 finally:
268 # clean up all created containers for this test
269 print "Stop consul ..."
270 self._stop_and_remove_all_containers()
271
272 print "Test_05_run_consul_only_End:------------------ took {} secs" \
273 "\n\n".format(time.time() - t0)
274
275 def test_06_run_voltha_standalone_with_consul_only(self):
276 print "Test_06_run_voltha_standalone_with_consul_only_Start:----------" \
277 "-------- "
278 t0 = time.time()
279
280 try:
281 # run consul first
282 print "Start consul ..."
283 self._run_consul()
284
285 # get consul ip
286 print "Get consul IP ..."
287 cmd = command_defs['docker_get_consul_ip']
288 consul_ip, err, rc = run_command_to_completion_with_raw_stdout(cmd)
289 self.assertEqual(rc, 0)
290 self.assertIsNotNone(consul_ip)
291
292 # start voltha now for 15 secs and verify it can now connect to
293 # consul - following message in the output
294 print "Start voltha with consul IP ..."
295 expected_pattern = ['coordinator', 'event: created-consul-session']
296 cmd = command_defs['docker_start_voltha_with_consul_ip'] + \
297 '{}:8500'.format(consul_ip.strip())
298 command_output = run_long_running_command_with_timeout(cmd, 10)
299
300 # Verify the output of voltha and get the container instance id
301 print "Verify voltha is registered with consul ..."
302 instance_id = None
303 for out in command_output:
304 if all(ep for ep in expected_pattern if ep in out):
305 self.assertTrue(True)
306 instance_id = re.findall(r'[0-9a-f]+', out)[-1]
307 break
308
309 self.assertIsNotNone(instance_id)
310
311 # Verify Voltha's self-registration with consul
312 expected_output = ['ModifyIndex', 'CreateIndex', 'Session',
313 'Value',
314 'Flags', 'Key', 'LockIndex']
315
316 cmd = command_defs['consul_verify_voltha_registration']
317 registration_info, err, rc = \
318 run_command_to_completion_with_raw_stdout(cmd)
319 self.assertEqual(rc, 0)
320 try:
321 jr_info = json.loads(registration_info)
322 intersect_elems = [e for e in jr_info[0] if
323 e in expected_output]
324 self.assertEqual(len(expected_output), len(intersect_elems))
325 except Exception as e:
326 self.assertRaises(e)
327
328 # stop voltha
329 print "Stop voltha ..."
330 self._stop_docker_container_by_id(instance_id)
331
332 # check the service has deregistered
333 print "Verify voltha is no longer registered in consul..."
334 cmd = command_defs['consul_verify_voltha_registration']
335 registration_info, err, rc = \
336 run_command_to_completion_with_raw_stdout(cmd)
337 self.assertEqual(rc, 0)
338 self.assertEqual(registration_info, '')
339
340 finally:
341 # clean up all created containers for this test
342 print "Stop consul ..."
343 self._stop_and_remove_all_containers()
344
345 print "Test_06_run_voltha_standalone_with_consul_only_End:--------" \
346 "---------- took {} " \
347 "secs \n\n".format(time.time() - t0)
348
349 def test_07_start_all_containers(self):
350 print "Test_07_start_all_containers_Start:------------------ "
351 t0 = time.time()
352
353 try:
354 # Pre-test - clean up all running docker containers
355 print "Pre-test: Removing all running containers ..."
356 self._stop_and_remove_all_containers()
357
358 # get a list of services in the docker-compose file
359 print "Getting list of services in docker compose file ..."
360 cmd = command_defs['docker_compose_services']
361 services, err, rc = run_command_to_completion_with_raw_stdout(cmd)
362 self.assertEqual(rc, 0)
363 docker_service_list = services.split()
364 self.assertGreaterEqual(len(docker_service_list),
365 DOCKER_COMPOSE_FILE_SERVICES_COUNT)
366
367 # start all the containers
368 print "Starting all containers ..."
369 cmd = command_defs['docker_compose_start_all']
370 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
371 self.assertEqual(rc, 0)
372
Khen Nursimulua54b6632016-10-18 18:01:25 -0400373 print "Waiting for all containers to be ready ..."
374 rc, not_found_list = self._wait_for_all_containers_to_ready()
375 if rc:
376 print "Not found patterns:{}".format(not_found_list)
377 self.assertEqual(rc, 0)
378
Khen Nursimulu37a9bf82016-10-16 20:11:31 -0400379 # verify that all containers are running
380 print "Verify all services are running using docker command ..."
381 for service in docker_service_list:
382 cmd = command_defs['docker_compose_ps'] + ' {} | wc -l'.format(
383 service)
384 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
385 self.assertEqual(rc, 0)
386 self.assertGreaterEqual(out, 3) # 2 are for headers
387
388 # Verify that 'docker ps' return the same number of running process
389 cmd = command_defs['docker_ps_count']
390 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
391 self.assertEqual(rc, 0)
392 self.assertEqual(out.split(), [str(len(docker_service_list))])
393
394 # Retrieve the list of services from consul and validate against
395 # the list obtained from docker composed
396 print "Verify all services are registered in consul ..."
397 expected_services = ['consul-rest', 'fluentd-intake',
398 'chameleon-rest', 'voltha-grpc',
399 'voltha-health',
400 'consul-8600', 'zookeeper', 'consul', 'kafka']
401
402 cmd = command_defs['consul_get_services']
403 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
404 self.assertEqual(rc, 0)
405 try:
406 consul_services = json.loads(out)
407 intersected_services = [s for s in expected_services if
408 s in consul_services]
409 self.assertEqual(len(intersected_services),
410 len(expected_services))
411 # services_match = 0
412 # for d_service in docker_service_list:
413 # for c_service in consul_services:
414 # if c_service.find(d_service) != -1:
415 # services_match += 1
416 # print d_service, c_service
417 # break
418 # self.assertEqual(services_match, len(docker_service_list))
419 except Exception as e:
420 self.assertRaises(e)
421
422 # Verify the service record of the voltha service
423 print "Verify the service record of voltha in consul ..."
424 expected_srv_elements = ['ModifyIndex', 'CreateIndex',
425 'ServiceEnableTagOverride', 'Node',
426 'Address', 'TaggedAddresses', 'ServiceID',
427 'ServiceName', 'ServiceTags',
428 'ServiceAddress', 'ServicePort']
429 cmd = command_defs['consul_get_srv_voltha_health']
430 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
431 self.assertEqual(rc, 0)
432 try:
433 srv = json.loads(out)
434 intersect_elems = [e for e in srv[0] if
435 e in expected_srv_elements]
436 self.assertEqual(len(expected_srv_elements),
437 len(intersect_elems))
438 except Exception as e:
439 self.assertRaises(e)
440
441 # Verify kafka client is receiving the messages
442 print "Verify kafka client is receiving the heartbeat messages ..."
443 expected_pattern = ['voltha-heartbeat', 'Heartbeat message']
444 cmd = command_defs['kafka_client_run_10_secs']
445 kafka_client_output = run_long_running_command_with_timeout(cmd,
Khen Nursimulua54b6632016-10-18 18:01:25 -0400446 20)
Khen Nursimulu37a9bf82016-10-16 20:11:31 -0400447
448 # Verify the kafka client output
449 # instance id
450 found = False
451 for out in kafka_client_output:
452 if all(ep for ep in expected_pattern if ep in out):
453 found = True
454 break
455 self.assertTrue(found)
456
457 # verify docker-compose logs are being produced - just get the
458 # first work of each line
459 print "Verify docker compose logs has output from all the services " \
460 "..."
461 expected_output = ['voltha_1', 'fluentd_1', 'consul_1',
462 'registrator_1', 'kafka_1', 'zookeeper_1',
463 'chameleon_1']
464 cmd = command_defs['docker_compose_logs']
465 docker_compose_logs = run_long_running_command_with_timeout(cmd, 5,
466 0)
467 intersected_logs = [l for l in expected_output if
468 l in docker_compose_logs]
469 self.assertEqual(len(intersected_logs), len(expected_output))
470
471 # TODO: file in /tmp/fluentd/ cannot be found
472 # # verify fluentd logs are being produced - we will just verify
473 # that there are "voltha.logging" in the logs
474 # os.environ["PYTHONPATH"] += os.pathsep + "/tmp/fluentd/"
475 # os.environ['PATH'] += os.pathsep + "/tmp/fluentd/"
476 # expected_output=['voltha.logging']
477 # cmd = command_defs['fluentd_logs']
478 # fluentd_logs, err = run_command_to_completion_with_raw_stdout(cmd)
479 # # self.assertIsNone(err)
480 # print err
481 # intersected_logs = [l for l in expected_output if
482 # l in fluentd_logs]
483 # self.assertEqual(len(intersected_logs), len(expected_output))
484
485 # verify docker voltha logs are being produced - we will just verify
486 # some
487 # key messages in the logs
488 print "Verify docker voltha logs are produced ..."
489 expected_output = ['kafka_proxy.send_message',
490 'coordinator._renew_session', 'main.heartbeat']
491 cmd = command_defs['docker_voltha_logs']
492 docker_voltha_logs = run_long_running_command_with_timeout(cmd,
493 0.5, 2)
494 intersected_logs = [l for l in expected_output if
495 l in docker_voltha_logs]
496 self.assertEqual(len(intersected_logs), len(expected_output))
497
498 finally:
499 print "Stopping all containers ..."
500 # clean up all created containers for this test
501 self._stop_and_remove_all_containers()
502
503 print "Test_07_start_all_containers_End:------------------ took {}" \
504 " secs \n\n".format(time.time() - t0)
505
506 def test_08_stop_all_containers_started_using_docker_compose(self):
507 print "Test_08_stop_all_containers_started_using_docker_compose_Start:" \
508 "------------------ "
509 t0 = time.time()
510
511 try:
512 # commands to stop and clear the docker images
513 cmds = [command_defs['docker_compose_stop'],
514 command_defs['docker_compose_rm_f']]
515
516 print "Stopping all containers ..."
517 for cmd in cmds:
518 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
519 self.assertEqual(rc, 0)
520
521 # Verify that no docker process is running
522 print "Verify no containers is running..."
523 cmd = command_defs['docker_compose_services_running']
524 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
525 self.assertEqual(rc, 0)
526
527 finally:
528 print "Test_08_stop_all_containers_started_using_docker_compose_:" \
529 "------------------ took {} secs \n\n".format(
530 time.time() - t0)
531
532 def test_09_dig_consul_command(self):
533 print "Test_09_dig_consul_command_Start:------------------"
534 t0 = time.time()
535
536 try:
537 # start all containers
538 print "Start all containers..."
539 self._start_all_containers()
540
Khen Nursimulua54b6632016-10-18 18:01:25 -0400541 print "Waiting for all containers to be ready ..."
Khen Nursimulu37a9bf82016-10-16 20:11:31 -0400542 time.sleep(10)
Khen Nursimulua54b6632016-10-18 18:01:25 -0400543 rc, not_found_list = self._wait_for_all_containers_to_ready()
544 if rc:
545 print "Not found patterns:{}".format(not_found_list)
546 self.assertEqual(rc, 0)
Khen Nursimulu37a9bf82016-10-16 20:11:31 -0400547
548 # Get the IP address(es) for voltha's REST interface
549 print "Get IP of Voltha REST interface..."
550 cmd = command_defs['consul_get_voltha_rest_a_record']
551 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
552 self.assertEqual(rc, 0)
553 self.assertGreaterEqual(out.find("voltha-health.service.consul"),
554 0)
555
556 # Get only the ip address
557 cmd = command_defs['consul_get_voltha_rest_ip']
558 ip, err, rc = run_command_to_completion_with_raw_stdout(cmd)
559 self.assertEqual(rc, 0)
560 self.assertTrue(is_valid_ip(ip))
561
562 # Get the exposed service port
563 print "Get Voltha exposed service port..."
564 cmd = command_defs['consul_get_voltha_service_port']
565 port, err, rc = run_command_to_completion_with_raw_stdout(cmd)
566 self.assertEqual(rc, 0)
567 # Verify that we can connect to the port using the previously
568 # acquired ip
569 print "Verify connectivity with voltha ip and port..."
570 self.assertTrue(is_open('{}:{}'.format(ip, port)))
571 finally:
572 print "Stopping all containers ..."
573 # clean up all created containers for this test
574 self._stop_and_remove_all_containers()
575
576 print "Test_09_dig_consul_command_Start_End:------------------" \
577 "took {} secs \n\n".format(time.time() - t0)
578
579 def test_10_scale_voltha(self):
580 print "Test_10_scale_voltha_Start:------------------"
581 t0 = time.time()
582
583 try:
584 # start all containers
585 print "Start all containers..."
586 self._start_all_containers()
587
Khen Nursimulua54b6632016-10-18 18:01:25 -0400588 print "Waiting for all containers to be ready ..."
589 time.sleep(10)
590 rc, not_found_list = self._wait_for_all_containers_to_ready()
591 if rc:
592 print "Not found patterns:{}".format(not_found_list)
593 self.assertEqual(rc, 0)
594
Khen Nursimulu37a9bf82016-10-16 20:11:31 -0400595 # Scale voltha to 10 instances
596 print "Scale voltha to 10 instances ..."
597 cmd = command_defs['docker_compose_scale_voltha_to_10']
598 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
599 self.assertEqual(rc, 0)
600
601 # Verify that 10 instances are running
602 print "Verify 10 instances of voltha are running ..."
603 cmd = command_defs['docker_compose_scaled_voltha_ps']
604 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
605 self.assertEqual(rc, 0)
606 self.assertEqual(out.split(), ['10'])
607 finally:
608 print "Stopping all containers ..."
609 # clean up all created containers for this test
610 self._stop_and_remove_all_containers()
611
612 print "Test_10_scale_voltha_End:------------------took {} secs " \
613 "\n\n".format(time.time() - t0)
614
615 def _start_all_containers(self):
616 cmd = command_defs['docker_compose_start_all']
617 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
618 self.assertEqual(rc, 0)
619
620 def _run_consul(self):
621 # run consul
622 cmd = command_defs['docker_compose_start_consul']
623 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
624 self.assertEqual(rc, 0)
625
626 # verify consul is up
627 cmd = command_defs['docker_compose_is_consul_up']
628 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
629 self.assertEqual(rc, 0)
630 self.assertIn('compose_consul_1', out)
631
632 def _stop_and_remove_all_containers(self):
633 # check if there are any running containers first
634 cmd = command_defs['docker_ps']
635 out, err, rc = run_command_to_completion_with_stdout_in_list(cmd)
636 self.assertEqual(rc, 0)
637 if len(out) > 1: # not counting docker ps header
638 cmd = command_defs['docker_stop_and_remove_all_containers']
639 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
640 self.assertEqual(rc, 0)
641
642 def _stop_docker_container_by_id(self, instance_id):
643 # stop
644 cmd = command_defs['docker_stop'] + " {}".format(instance_id)
645 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
646 self.assertEqual(rc, 0)
647
648 # remove
649 cmd = command_defs['docker_rm'] + " {}".format(instance_id)
650 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
651 self.assertEqual(rc, 0)
652
653 def _source_env(self):
654 # Go to voltha root directory
655 res = os.system('cd {}'.format(this_dir))
656 assert res == 0
657
658 # set the env
659 command = ['bash', '-c', '. env.sh']
660 proc = subprocess.Popen(command, stdout=subprocess.PIPE,
661 stderr=subprocess.PIPE)
662
663 if proc.wait() != 0:
664 err_msg = "Failed to source the environment'"
665 raise RuntimeError(err_msg)
666
667 env = os.environ.copy()
668 return env
Khen Nursimulua54b6632016-10-18 18:01:25 -0400669
670 def _wait_for_consul_to_be_ready(self):
671 # Consul is ready when it's leader ip and port is set. The maximum
672 # time to wait of 60 secs as consul should be ready by then
673 max_wait_time = 60
674 t0 = time.time()
675
676 while True:
677 # Get the docker IP address and port number of the consul instance
678 print "waiting for consul to be ready ..."
679 cmd = command_defs['consul_get_leader_ip_port']
680 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
681 out = out.strip()
682 if rc != 0:
683 # Something is wrong, return
684 return -1 # error
685 elif out is not None and out != '':
686 return 0 # found something
687 elif time.time() - t0 > max_wait_time:
688 return -1 # consul should have come up by this time
689 else:
690 time.sleep(2) # constant sleep for testing
691
692 def _wait_for_all_containers_to_ready(self):
693 # After the containers have been started using docker-compose, look
694 # at the logs for the following patterns to decide if the containers
695 # are up and in sync:
696 #
697 # For registrator, look for
698 # "(.*)registrator_1(.*)Listening for Docker events"
699 #
700 # For voltha, zookeeper and kafka look for these patterns
701 # "(.*)voltha_1(.*)main.heartbeat {status: up, uptime:"
702 # "(.*)voltha_1(.*)kafka_proxy.send_message {event: Successfully sent message Heartbeat message"
703 #
704 # For fluentd, look for
705 # "(.*)fluentd_1(.*)listening fluent socket on"
706 #
707 # For chameleon, look for
708 # "(.*)chameleon_1(.*)main.startup_components {event:
709 # started-internal-services, instance_id: compose_chameleon_1}"
710 #
711 # For consul, look for
712 # "(.*)consul_1(.*)agent: Synced service(.*)consul(.*)8500"
713 # "(.*)consul_1(.*)agent: Synced service(.*)consul(.*)8600:udp"
714 # "(.*)consul_1(.*)agent: Synced service(.*)fluentd"
715 # "(.*)consul_1(.*)agent: Synced service(.*)voltha"
716 # "(.*)consul_1(.*)agent: Synced service(.*)voltha(.*):8880"
717 # "(.*)consul_1(.*)agent: Synced service(.*)chameleon(.*):8881"
718 # "(.*)consul_1(.*)agent: Synced service(.*)zookeeper(.*):2181"
719 # "(.*)consul_1(.*)agent: Synced service(.*)kafka(.*):9092"
720 #
721 expected_output = [
722 "(.*)registrator_1(.*)Listening for Docker events",
723 "(.*)voltha_1(.*)main.heartbeat {status: up, uptime:",
724 "(.*)voltha_1(.*)kafka_proxy.send_message {event: Successfully "
725 "sent message Heartbeat message",
726 "(.*)fluentd_1(.*)listening fluent socket on",
727 "(.*)chameleon_1(.*)main.startup_components {event: "
728 "started-internal-services, instance_id: compose_chameleon_1}",
729 "(.*)consul_1(.*)agent: Synced service(.*)consul(.*)8500",
730 "(.*)consul_1(.*)agent: Synced service(.*)consul(.*)8600:udp",
731 "(.*)consul_1(.*)agent: Synced service(.*)fluentd",
732 "(.*)consul_1(.*)agent: Synced service(.*)voltha(.*):(?!8880)",
733 "(.*)consul_1(.*)agent: Synced service(.*)voltha(.*):8880",
734 "(.*)consul_1(.*)agent: Synced service(.*)chameleon(.*):8881",
735 "(.*)consul_1(.*)agent: Synced service(.*)zookeeper(.*):2181",
736 "(.*)consul_1(.*)agent: Synced service(.*)kafka(.*):9092"
737 ]
738 pattern_found = []
739 max_wait_time = 120 # wait 2 mins as a maximum
740
741 def _stop_process(proc):
742 try:
743 proc.terminate()
744 proc.wait()
745 # In principle this 'reset' should not be required.
746 # However, without it, the terminal is left in a funny
747 # state and required
748 subprocess.Popen(['reset']).wait()
749 except Exception as e:
750 print "Received exception {} when killing process " \
751 .format(repr(e), )
752
753 try:
754 t0 = time.time()
755 env = os.environ.copy()
756 proc = subprocess.Popen(
757 command_defs['docker_compose_logs'],
758 env=env,
759 shell=True,
760 stdout=subprocess.PIPE,
761 stderr=subprocess.PIPE,
762 bufsize=1
763 )
764 for line in iter(proc.stdout.readline, b''):
765 ansi_escape = re.compile(r'\x1b[^m]*m')
766 line = ansi_escape.sub('', line)
767 for pattern in expected_output:
768 if re.match(pattern, line, re.I):
769 if pattern not in pattern_found:
770 pattern_found.append(pattern)
771 break
772 # Check if we found all patterns yet
773 if len(pattern_found) == len(expected_output):
774 _stop_process(proc)
775 return 0, [] # success
776 elif time.time() - t0 > max_wait_time:
777 _stop_process(proc)
778 not_found = [p for p in expected_output if p not in
779 pattern_found]
780 return -1, not_found # failure
781 except Exception as e:
782 print 'Exception {} '.format(repr(e))
783 return -1, []