This update consists of the following changes:
    1) Add GroupConsumer to the Go sarama_client and modify the Core
    code to use a groupConsumer instead of a partition consumer. This
    change will ensure that multiple consumers (with different group Ids)
    can consume kafka messages from the same topic.
    2) Remove afkak kafka client and replace it with confluent kakfa,
    a change done in voltha 1.x. Modify the code accordingly.
    3) Add a Group Consumer to the Python kakfa client such that
    several instances of an Adapter can consume the same messages from
    the same topic.
    4) Set the datapath_id for the logical device in the Core.

Change-Id: I5d7ced27c9aeca4f6211baa3dc8cb3db861545e4
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 3381fdc..c527619 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -33,12 +33,12 @@
 
 // Initialize the logger - gets the default until the main function setup the logger
 func init() {
-	log.AddPackage(log.JSON, log.WarnLevel, nil)
+	log.AddPackage(log.JSON, log.DebugLevel, nil)
 }
 
 const (
 	DefaultMaxRetries     = 3
-	DefaultRequestTimeout = 500 // 500 milliseconds - to handle a wider latency range
+	DefaultRequestTimeout = 3000 // 3000 milliseconds - to handle a wider latency range
 )
 
 // requestHandlerChannel represents an interface associated with a channel.  Whenever, an event is
@@ -172,7 +172,7 @@
 	log.Info("stopping-intercontainer-proxy")
 	kp.doneCh <- 1
 	// TODO : Perform cleanup
-	//kp.kafkaClient.Stop()
+	kp.kafkaClient.Stop()
 	//kp.deleteAllTopicRequestHandlerChannelMap()
 	//kp.deleteAllTopicResponseChannelMap()
 	//kp.deleteAllTransactionIdToChannelMap()
@@ -331,6 +331,7 @@
 	var err error
 	if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
 		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+		return err
 	}
 	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
 
@@ -676,7 +677,7 @@
 	for {
 		select {
 		case msg := <-subscribedCh:
-			//log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
+			log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
 			if msg.Header.Type == ic.MessageType_RESPONSE {
 				go kp.dispatchResponse(msg)
 			}
@@ -700,6 +701,7 @@
 	// First check whether we already have a channel listening for response on that topic.  If there is
 	// already one then it will be reused.  If not, it will be created.
 	if !kp.isTopicSubscribedForResponse(topic.Name) {
+		log.Debugw("not-subscribed-for-response", log.Fields{"topic": topic.Name, "trnsid": trnsId})
 		var subscribedCh <-chan *ic.InterContainerMessage
 		var err error
 		if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {