VOL-2232 restrict kafka readiness to connectivity-related errors

Change-Id: I31d3d2a8ded41bb762a9105781458a50f05ec8c3
diff --git a/VERSION b/VERSION
index c36c648..be067c7 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.2.17
+2.2.18-dev
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index a251c56..73025d9 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -16,10 +16,12 @@
 package kafka
 
 import (
+	"context"
 	"errors"
 	"fmt"
 	"github.com/Shopify/sarama"
 	scc "github.com/bsm/sarama-cluster"
+	"github.com/eapache/go-resiliency/breaker"
 	"github.com/golang/protobuf/proto"
 	"github.com/google/uuid"
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
@@ -482,6 +484,49 @@
 	}
 }
 
+func (sc *SaramaClient) isLivenessError(err error) bool {
+	// Sarama producers and consumers encapsulate the error inside
+	// a ProducerError or ConsumerError struct.
+	if prodError, ok := err.(*sarama.ProducerError); ok {
+		err = prodError.Err
+	} else if consumerError, ok := err.(*sarama.ConsumerError); ok {
+		err = consumerError.Err
+	}
+
+	// Sarama-Cluster will compose the error into a ClusterError struct,
+	// which we can't do a compare by reference. To handle that, we the
+	// best we can do is compare the error strings.
+
+	switch err.Error() {
+	case context.DeadlineExceeded.Error():
+		log.Info("is-liveness-error-timeout")
+		return true
+	case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
+		log.Info("is-liveness-error-no-brokers")
+		return true
+	case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
+		log.Info("is-liveness-error-shutting-down")
+		return true
+	case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
+		log.Info("is-liveness-error-not-available")
+		return true
+	case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
+		log.Info("is-liveness-error-circuit-breaker-open")
+		return true
+	}
+
+	if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
+		log.Info("is-liveness-error-connection-refused")
+		return true
+	}
+
+	// Other errors shouldn't trigger a loss of liveness
+
+	log.Infow("is-liveness-error-ignored", log.Fields{"err": err})
+
+	return false
+}
+
 // send formats and sends the request onto the kafka messaging bus.
 func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
 
@@ -521,7 +566,7 @@
 		sc.updateLiveness(true)
 	case notOk := <-sc.producer.Errors():
 		log.Debugw("error-sending", log.Fields{"status": notOk})
-		if strings.Contains(notOk.Error(), "Failed to produce") {
+		if sc.isLivenessError(notOk) {
 			sc.updateLiveness(false)
 		}
 		return notOk
@@ -577,7 +622,7 @@
 		sc.updateLiveness(true)
 	case notOk := <-sc.producer.Errors():
 		log.Debugw("liveness-error-sending", log.Fields{"status": notOk})
-		if strings.Contains(notOk.Error(), "Failed to produce") {
+		if sc.isLivenessError(notOk) {
 			sc.updateLiveness(false)
 		}
 		return notOk
@@ -896,7 +941,9 @@
 		select {
 		case err, ok := <-consumer.Errors():
 			if ok {
-				sc.updateLiveness(false)
+				if sc.isLivenessError(err) {
+					sc.updateLiveness(false)
+				}
 				log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
 			} else {
 				log.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})