VOL-2631 Cleanup Kafka Interadapter Proxy on shutdown;
Exit gracefully rather than Fatal() on kafka shutdown

Change-Id: Ic4506a8f4e5127d10a0db130a81584c0bf8c8428
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
index aa77ffb..91b2143 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
@@ -189,9 +189,15 @@
 	kp.doneOnce.Do(func() { close(kp.doneCh) })
 	// TODO : Perform cleanup
 	kp.kafkaClient.Stop()
-	//kp.deleteAllTopicRequestHandlerChannelMap()
-	//kp.deleteAllTopicResponseChannelMap()
-	//kp.deleteAllTransactionIdToChannelMap()
+	err := kp.deleteAllTopicRequestHandlerChannelMap()
+	if err != nil {
+		log.Errorw("failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
+	}
+	err = kp.deleteAllTopicResponseChannelMap()
+	if err != nil {
+		log.Errorw("failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
+	}
+	kp.deleteAllTransactionIdToChannelMap()
 }
 
 func (kp *interContainerProxy) GetDefaultTopic() *Topic {
@@ -409,17 +415,25 @@
 
 // nolint: unused
 func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
+	logger.Debug("delete-all-topic-response-channel")
 	kp.lockTopicResponseChannelMap.Lock()
 	defer kp.lockTopicResponseChannelMap.Unlock()
-	var err error
+	var unsubscribeFailTopics []string
 	for topic := range kp.topicToResponseChannelMap {
 		// Unsubscribe to this topic first - this will close the subscribed channel
-		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+		if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+			unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
 			logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+			// Do not return. Continue to try to unsubscribe to other topics.
+		} else {
+			// Only delete from channel map if successfully unsubscribed.
+			delete(kp.topicToResponseChannelMap, topic)
 		}
-		delete(kp.topicToResponseChannelMap, topic)
 	}
-	return err
+	if len(unsubscribeFailTopics) > 0 {
+		return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+	}
+	return nil
 }
 
 func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
@@ -447,17 +461,25 @@
 
 // nolint: unused
 func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
+	logger.Debug("delete-all-topic-request-channel")
 	kp.lockTopicRequestHandlerChannelMap.Lock()
 	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
-	var err error
+	var unsubscribeFailTopics []string
 	for topic := range kp.topicToRequestHandlerChannelMap {
 		// Close the kafka client client first by unsubscribing to this topic
-		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+		if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+			unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
 			logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+			// Do not return. Continue to try to unsubscribe to other topics.
+		} else {
+			// Only delete from channel map if successfully unsubscribed.
+			delete(kp.topicToRequestHandlerChannelMap, topic)
 		}
-		delete(kp.topicToRequestHandlerChannelMap, topic)
 	}
-	return err
+	if len(unsubscribeFailTopics) > 0 {
+		return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+	}
+	return nil
 }
 
 func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
@@ -491,6 +513,7 @@
 
 // nolint: unused
 func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
+	logger.Debug("delete-all-transaction-id-channel-map")
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	for key, value := range kp.transactionIdToChannelMap {