VOL-2232 restrict kafka readiness to connectivity-related errors
Change-Id: I31d3d2a8ded41bb762a9105781458a50f05ec8c3
diff --git a/VERSION b/VERSION
index c36c648..be067c7 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.2.17
+2.2.18-dev
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index a251c56..73025d9 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -16,10 +16,12 @@
package kafka
import (
+ "context"
"errors"
"fmt"
"github.com/Shopify/sarama"
scc "github.com/bsm/sarama-cluster"
+ "github.com/eapache/go-resiliency/breaker"
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
@@ -482,6 +484,49 @@
}
}
+func (sc *SaramaClient) isLivenessError(err error) bool {
+ // Sarama producers and consumers encapsulate the error inside
+ // a ProducerError or ConsumerError struct.
+ if prodError, ok := err.(*sarama.ProducerError); ok {
+ err = prodError.Err
+ } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
+ err = consumerError.Err
+ }
+
+ // Sarama-Cluster will compose the error into a ClusterError struct,
+ // which we can't do a compare by reference. To handle that, we the
+ // best we can do is compare the error strings.
+
+ switch err.Error() {
+ case context.DeadlineExceeded.Error():
+ log.Info("is-liveness-error-timeout")
+ return true
+ case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
+ log.Info("is-liveness-error-no-brokers")
+ return true
+ case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
+ log.Info("is-liveness-error-shutting-down")
+ return true
+ case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
+ log.Info("is-liveness-error-not-available")
+ return true
+ case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
+ log.Info("is-liveness-error-circuit-breaker-open")
+ return true
+ }
+
+ if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
+ log.Info("is-liveness-error-connection-refused")
+ return true
+ }
+
+ // Other errors shouldn't trigger a loss of liveness
+
+ log.Infow("is-liveness-error-ignored", log.Fields{"err": err})
+
+ return false
+}
+
// send formats and sends the request onto the kafka messaging bus.
func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
@@ -521,7 +566,7 @@
sc.updateLiveness(true)
case notOk := <-sc.producer.Errors():
log.Debugw("error-sending", log.Fields{"status": notOk})
- if strings.Contains(notOk.Error(), "Failed to produce") {
+ if sc.isLivenessError(notOk) {
sc.updateLiveness(false)
}
return notOk
@@ -577,7 +622,7 @@
sc.updateLiveness(true)
case notOk := <-sc.producer.Errors():
log.Debugw("liveness-error-sending", log.Fields{"status": notOk})
- if strings.Contains(notOk.Error(), "Failed to produce") {
+ if sc.isLivenessError(notOk) {
sc.updateLiveness(false)
}
return notOk
@@ -896,7 +941,9 @@
select {
case err, ok := <-consumer.Errors():
if ok {
- sc.updateLiveness(false)
+ if sc.isLivenessError(err) {
+ sc.updateLiveness(false)
+ }
log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
} else {
log.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})