VOL-2631 Cleanup Kafka Interadapter Proxy
Change-Id: I4922e57de1727494d5a306226f67c6300eb29437
diff --git a/VERSION b/VERSION
index f93fc9f..eea30e5 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.0.12
+3.0.13
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index aa77ffb..91b2143 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -189,9 +189,15 @@
kp.doneOnce.Do(func() { close(kp.doneCh) })
// TODO : Perform cleanup
kp.kafkaClient.Stop()
- //kp.deleteAllTopicRequestHandlerChannelMap()
- //kp.deleteAllTopicResponseChannelMap()
- //kp.deleteAllTransactionIdToChannelMap()
+ err := kp.deleteAllTopicRequestHandlerChannelMap()
+ if err != nil {
+ log.Errorw("failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
+ }
+ err = kp.deleteAllTopicResponseChannelMap()
+ if err != nil {
+ log.Errorw("failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
+ }
+ kp.deleteAllTransactionIdToChannelMap()
}
func (kp *interContainerProxy) GetDefaultTopic() *Topic {
@@ -409,17 +415,25 @@
// nolint: unused
func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
+ logger.Debug("delete-all-topic-response-channel")
kp.lockTopicResponseChannelMap.Lock()
defer kp.lockTopicResponseChannelMap.Unlock()
- var err error
+ var unsubscribeFailTopics []string
for topic := range kp.topicToResponseChannelMap {
// Unsubscribe to this topic first - this will close the subscribed channel
- if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+ if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+ unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+ // Do not return. Continue to try to unsubscribe to other topics.
+ } else {
+ // Only delete from channel map if successfully unsubscribed.
+ delete(kp.topicToResponseChannelMap, topic)
}
- delete(kp.topicToResponseChannelMap, topic)
}
- return err
+ if len(unsubscribeFailTopics) > 0 {
+ return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+ }
+ return nil
}
func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
@@ -447,17 +461,25 @@
// nolint: unused
func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
+ logger.Debug("delete-all-topic-request-channel")
kp.lockTopicRequestHandlerChannelMap.Lock()
defer kp.lockTopicRequestHandlerChannelMap.Unlock()
- var err error
+ var unsubscribeFailTopics []string
for topic := range kp.topicToRequestHandlerChannelMap {
// Close the kafka client client first by unsubscribing to this topic
- if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+ if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+ unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+ // Do not return. Continue to try to unsubscribe to other topics.
+ } else {
+ // Only delete from channel map if successfully unsubscribed.
+ delete(kp.topicToRequestHandlerChannelMap, topic)
}
- delete(kp.topicToRequestHandlerChannelMap, topic)
}
- return err
+ if len(unsubscribeFailTopics) > 0 {
+ return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+ }
+ return nil
}
func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
@@ -491,6 +513,7 @@
// nolint: unused
func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
+ logger.Debug("delete-all-transaction-id-channel-map")
kp.lockTransactionIdToChannelMap.Lock()
defer kp.lockTransactionIdToChannelMap.Unlock()
for key, value := range kp.transactionIdToChannelMap {