VOL-2631 Cleanup Kafka Interadapter Proxy

Change-Id: I4922e57de1727494d5a306226f67c6300eb29437
diff --git a/VERSION b/VERSION
index f93fc9f..eea30e5 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.0.12
+3.0.13
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index aa77ffb..91b2143 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -189,9 +189,15 @@
 	kp.doneOnce.Do(func() { close(kp.doneCh) })
 	// TODO : Perform cleanup
 	kp.kafkaClient.Stop()
-	//kp.deleteAllTopicRequestHandlerChannelMap()
-	//kp.deleteAllTopicResponseChannelMap()
-	//kp.deleteAllTransactionIdToChannelMap()
+	err := kp.deleteAllTopicRequestHandlerChannelMap()
+	if err != nil {
+		log.Errorw("failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
+	}
+	err = kp.deleteAllTopicResponseChannelMap()
+	if err != nil {
+		log.Errorw("failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
+	}
+	kp.deleteAllTransactionIdToChannelMap()
 }
 
 func (kp *interContainerProxy) GetDefaultTopic() *Topic {
@@ -409,17 +415,25 @@
 
 // nolint: unused
 func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
+	logger.Debug("delete-all-topic-response-channel")
 	kp.lockTopicResponseChannelMap.Lock()
 	defer kp.lockTopicResponseChannelMap.Unlock()
-	var err error
+	var unsubscribeFailTopics []string
 	for topic := range kp.topicToResponseChannelMap {
 		// Unsubscribe to this topic first - this will close the subscribed channel
-		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+		if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+			unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
 			logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+			// Do not return. Continue to try to unsubscribe to other topics.
+		} else {
+			// Only delete from channel map if successfully unsubscribed.
+			delete(kp.topicToResponseChannelMap, topic)
 		}
-		delete(kp.topicToResponseChannelMap, topic)
 	}
-	return err
+	if len(unsubscribeFailTopics) > 0 {
+		return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+	}
+	return nil
 }
 
 func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
@@ -447,17 +461,25 @@
 
 // nolint: unused
 func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
+	logger.Debug("delete-all-topic-request-channel")
 	kp.lockTopicRequestHandlerChannelMap.Lock()
 	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
-	var err error
+	var unsubscribeFailTopics []string
 	for topic := range kp.topicToRequestHandlerChannelMap {
 		// Close the kafka client client first by unsubscribing to this topic
-		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+		if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+			unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
 			logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+			// Do not return. Continue to try to unsubscribe to other topics.
+		} else {
+			// Only delete from channel map if successfully unsubscribed.
+			delete(kp.topicToRequestHandlerChannelMap, topic)
 		}
-		delete(kp.topicToRequestHandlerChannelMap, topic)
 	}
-	return err
+	if len(unsubscribeFailTopics) > 0 {
+		return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+	}
+	return nil
 }
 
 func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
@@ -491,6 +513,7 @@
 
 // nolint: unused
 func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
+	logger.Debug("delete-all-transaction-id-channel-map")
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	for key, value := range kp.transactionIdToChannelMap {