This update addresses an issue when a device is deleted. The
channel closed event was not captured correctly.
Change-Id: If832fdb65c783a4e965bd179884595ce013f468e
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index c527619..72abe9e 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -269,7 +269,16 @@
// Remove the subscription for a response on return
defer kp.unSubscribeForResponse(protoRequest.Header.Id)
select {
- case msg := <-ch:
+ case msg, ok := <-ch:
+ if !ok {
+ log.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
+ protoError := &ic.Error{Reason: "channel-closed"}
+ var marshalledArg *any.Any
+ if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
+ return false, nil // Should never happen
+ }
+ return false, marshalledArg
+ }
log.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
var responseBody *ic.InterContainerResponseBody
var err error
@@ -467,9 +476,14 @@
func (kp *InterContainerProxy) DeleteTopic(topic Topic) error {
// If we have any consumers on that topic we need to close them
- kp.deleteFromTopicResponseChannelMap(topic.Name)
- kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
+ if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
+ log.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
+ }
+ if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
+ log.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
+ }
kp.deleteTopicTransactionIdToChannelMap(topic.Name)
+
return kp.kafkaClient.DeleteTopic(&topic)
}
@@ -676,8 +690,13 @@
startloop:
for {
select {
- case msg := <-subscribedCh:
- log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
+ case msg, ok := <-subscribedCh:
+ if !ok {
+ log.Debugw("channel-closed", log.Fields{"topic": topic.Name})
+ break startloop
+ }
+ log.Debugw("message-received", log.Fields{"msg": msg})
+ //log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
if msg.Header.Type == ic.MessageType_RESPONSE {
go kp.dispatchResponse(msg)
}
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 10c692a..e669940 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -722,19 +722,17 @@
startloop:
for {
select {
- case err := <-consumer.Errors():
- if err != nil {
+ case err, ok := <-consumer.Errors():
+ if ok {
log.Warnw("partition-consumers-error", log.Fields{"error": err})
} else {
- // There is a race condition when this loop is stopped and the consumer is closed where
- // the actual error comes as nil
- log.Warn("partition-consumers-error")
+ // Channel is closed
+ break startloop
}
- case msg := <-consumer.Messages():
+ case msg, ok := <-consumer.Messages():
//log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
- if msg == nil {
- // There is a race condition when this loop is stopped and the consumer is closed where
- // the actual msg comes as nil
+ if !ok {
+ // channel is closed
break startloop
}
msgBody := msg.Value
@@ -755,33 +753,19 @@
func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
- //go func() {
- // for msg := range consumer.Errors() {
- // log.Warnw("group-consumers-error", log.Fields{"error": msg.Error()})
- // }
- //}()
- //
- //go func() {
- // for ntf := range consumer.Notifications() {
- // log.Debugw("group-received-notification", log.Fields{"notification": ntf})
- // }
- //}()
-
startloop:
for {
select {
- case err := <-consumer.Errors():
- if err != nil {
+ case err, ok := <-consumer.Errors():
+ if ok {
log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
} else {
- // There is a race condition when this loop is stopped and the consumer is closed where
- // the actual error comes as nil
- log.Warnw("group-consumers-error-nil", log.Fields{"topic": topic.Name})
+ // channel is closed
+ break startloop
}
- case msg := <-consumer.Messages():
- if msg == nil {
- // There is a race condition when this loop is stopped and the consumer is closed where
- // the actual msg comes as nil
+ case msg, ok := <-consumer.Messages():
+ if !ok {
+ // Channel closed
break startloop
}
log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
@@ -911,6 +895,7 @@
if channel == ch {
channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
close(channel)
+ log.Debug("channel-closed")
return channels[:len(channels)-1]
}
}
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index c287c10..b26b161 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -158,14 +158,11 @@
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
- // We no longer need to have a target against that topic as we won't receive any unsolicited messages on that
- // topic
- if err := ap.kafkaICProxy.UnSubscribeFromRequestHandler(replyToTopic); err != nil {
- log.Errorw("Unable-to-subscribe-from-target", log.Fields{"topic": replyToTopic, "error": err})
+ // We no longer need to have this device topic as we won't receive any unsolicited messages on it
+ if err := ap.kafkaICProxy.DeleteTopic(replyToTopic); err != nil {
+ log.Errorw("Unable-to-delete-topic", log.Fields{"topic": replyToTopic, "error": err})
return err
}
- // Now delete the topic altogether
- ap.kafkaICProxy.DeleteTopic(replyToTopic)
return unPackResponse(rpc, device.Id, success, result)
}