[VOL-2101] adding the update status in partition consumer
commit message updated
tag updated in VERSION file
Change-Id: I534dcc63ae0d66dd5939d9d8f547c76d3111eca5
diff --git a/VERSION b/VERSION
index ef93bcc..0c2c783 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.2.19
+2.2.20
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index 73025d9..ff521a7 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -907,7 +907,10 @@
select {
case err, ok := <-consumer.Errors():
if ok {
- log.Warnw("partition-consumers-error", log.Fields{"error": err})
+ if sc.isLivenessError(err) {
+ sc.updateLiveness(false)
+ log.Warnw("partition-consumers-error", log.Fields{"error": err})
+ }
} else {
// Channel is closed
break startloop
@@ -919,6 +922,8 @@
break startloop
}
msgBody := msg.Value
+ sc.updateLiveness(true)
+ log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
icm := &ic.InterContainerMessage{}
if err := proto.Unmarshal(msgBody, icm); err != nil {
log.Warnw("partition-invalid-message", log.Fields{"error": err})