This update addresses an issue when a device is deleted.  The
channel closed event was not captured correctly.

Change-Id: If832fdb65c783a4e965bd179884595ce013f468e
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index c527619..72abe9e 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -269,7 +269,16 @@
 		// Remove the subscription for a response on return
 		defer kp.unSubscribeForResponse(protoRequest.Header.Id)
 		select {
-		case msg := <-ch:
+		case msg, ok := <-ch:
+			if !ok {
+				log.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
+				protoError := &ic.Error{Reason: "channel-closed"}
+				var marshalledArg *any.Any
+				if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
+					return false, nil // Should never happen
+				}
+				return false, marshalledArg
+			}
 			log.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
 			var responseBody *ic.InterContainerResponseBody
 			var err error
@@ -467,9 +476,14 @@
 
 func (kp *InterContainerProxy) DeleteTopic(topic Topic) error {
 	// If we have any consumers on that topic we need to close them
-	kp.deleteFromTopicResponseChannelMap(topic.Name)
-	kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
+	if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
+		log.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
+	}
+	if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
+		log.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
+	}
 	kp.deleteTopicTransactionIdToChannelMap(topic.Name)
+
 	return kp.kafkaClient.DeleteTopic(&topic)
 }
 
@@ -676,8 +690,13 @@
 startloop:
 	for {
 		select {
-		case msg := <-subscribedCh:
-			log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
+		case msg, ok := <-subscribedCh:
+			if !ok {
+				log.Debugw("channel-closed", log.Fields{"topic": topic.Name})
+				break startloop
+			}
+			log.Debugw("message-received", log.Fields{"msg": msg})
+			//log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
 			if msg.Header.Type == ic.MessageType_RESPONSE {
 				go kp.dispatchResponse(msg)
 			}
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 10c692a..e669940 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -722,19 +722,17 @@
 startloop:
 	for {
 		select {
-		case err := <-consumer.Errors():
-			if err != nil {
+		case err, ok := <-consumer.Errors():
+			if ok {
 				log.Warnw("partition-consumers-error", log.Fields{"error": err})
 			} else {
-				// There is a race condition when this loop is stopped and the consumer is closed where
-				// the actual error comes as nil
-				log.Warn("partition-consumers-error")
+				// Channel is closed
+				break startloop
 			}
-		case msg := <-consumer.Messages():
+		case msg, ok := <-consumer.Messages():
 			//log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
-			if msg == nil {
-				// There is a race condition when this loop is stopped and the consumer is closed where
-				// the actual msg comes as nil
+			if !ok {
+				// channel is closed
 				break startloop
 			}
 			msgBody := msg.Value
@@ -755,33 +753,19 @@
 func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
 	log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
 
-	//go func() {
-	//	for msg := range consumer.Errors() {
-	//		log.Warnw("group-consumers-error", log.Fields{"error": msg.Error()})
-	//	}
-	//}()
-	//
-	//go func() {
-	//	for ntf := range consumer.Notifications() {
-	//		log.Debugw("group-received-notification", log.Fields{"notification": ntf})
-	//	}
-	//}()
-
 startloop:
 	for {
 		select {
-		case err := <-consumer.Errors():
-			if err != nil {
+		case err, ok := <-consumer.Errors():
+			if ok {
 				log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
 			} else {
-				// There is a race condition when this loop is stopped and the consumer is closed where
-				// the actual error comes as nil
-				log.Warnw("group-consumers-error-nil", log.Fields{"topic": topic.Name})
+				// channel is closed
+				break startloop
 			}
-		case msg := <-consumer.Messages():
-			if msg == nil {
-				// There is a race condition when this loop is stopped and the consumer is closed where
-				// the actual msg comes as nil
+		case msg, ok := <-consumer.Messages():
+			if !ok {
+				// Channel closed
 				break startloop
 			}
 			log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
@@ -911,6 +895,7 @@
 		if channel == ch {
 			channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
 			close(channel)
+			log.Debug("channel-closed")
 			return channels[:len(channels)-1]
 		}
 	}