Moving build md tests to use kafkacat

Now checks whether a topic exists and checks
if heartbeats are received.

Change-Id: Ic48c6a42898ce68b7d827d0034824207c3555af6
diff --git a/tests/itests/docutests/build_md_test.py b/tests/itests/docutests/build_md_test.py
index 1e42a3b..554b898 100644
--- a/tests/itests/docutests/build_md_test.py
+++ b/tests/itests/docutests/build_md_test.py
@@ -17,7 +17,7 @@
 import subprocess
 import time
 import logging
-from common.utils.consulhelpers import verify_all_services_healthy
+from common.utils.consulhelpers import verify_all_services_healthy, get_endpoint_from_consul
 import os
 import json
 from unittest import TestCase
@@ -89,7 +89,8 @@
         .format(LOCAL_CONSUL_URL),
     consul_get_srv_voltha_health="curl -s {}/v1/catalog/service/voltha-health "
                                  "| jq -r .".format(LOCAL_CONSUL_URL),
-    kafka_client_run_10_secs="python kafka/kafka-consumer.py -r 10",
+    kafka_client_run="kafkacat -b {} -L",
+    kafka_client_heart_check="kafkacat -b {} -C -t voltha.heartbeat -c 1",
     consul_get_voltha_rest_a_record="dig {} voltha-health.service.consul"
         .format(LOCAL_CONSUL_DNS),
     consul_get_voltha_rest_ip="dig {} +short voltha-health.service.consul"
@@ -443,12 +444,29 @@
                 self.assertRaises(e)
 
             # Verify kafka client is receiving the messages
-            print "Verify kafka client is receiving the heartbeat messages ..."
-            expected_pattern = ['voltha.heartbeat', 'heartbeat']
-            cmd = command_defs['kafka_client_run_10_secs']
-            kafka_client_output = run_long_running_command_with_timeout(cmd,
-                                                                        20)
+            print "Verify kafka client has heartbeat topic ..."
+            expected_pattern = ['voltha.heartbeat']
+            kafka_endpoint = get_endpoint_from_consul(LOCAL_CONSUL,'kafka')
+            cmd = command_defs['kafka_client_run'].format(kafka_endpoint)
+            kafka_client_output = run_long_running_command_with_timeout(cmd, 20)
 
+            # TODO check that there are heartbeats
+            # Verify the kafka client output
+            # instance id
+            found = False
+            for out in kafka_client_output:
+                if all(ep for ep in expected_pattern if ep in out):
+                    found = True
+                    break
+            self.assertTrue(found)
+
+            print "Verify kafka client is receiving the heartbeat messages from voltha..."
+            expected_pattern = ['heartbeat', 'compose_voltha_1']
+            cmd = command_defs['kafka_client_heart_check'].format(kafka_endpoint)
+            kafka_client_output = run_long_running_command_with_timeout(cmd, 20)
+
+            print kafka_client_output
+            # TODO check that there are heartbeats
             # Verify the kafka client output
             # instance id
             found = False