VOL-2224 Event Filter Mechanism

         This commit contains the inital changes required for the event
         filtering so that when the protos get merged the rw and ro core
         could be built without compilation errors

         Note: This patchset and the patchset for voltha-protos needs to be merged
               as close to as possible otherwise there will be compilation errors

Change-Id: Id092fa19a0b302a33176a82e41b92a36ea8ede29
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go
index a251c56..73025d9 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go
@@ -16,10 +16,12 @@
 package kafka
 
 import (
+	"context"
 	"errors"
 	"fmt"
 	"github.com/Shopify/sarama"
 	scc "github.com/bsm/sarama-cluster"
+	"github.com/eapache/go-resiliency/breaker"
 	"github.com/golang/protobuf/proto"
 	"github.com/google/uuid"
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
@@ -482,6 +484,49 @@
 	}
 }
 
+func (sc *SaramaClient) isLivenessError(err error) bool {
+	// Sarama producers and consumers encapsulate the error inside
+	// a ProducerError or ConsumerError struct.
+	if prodError, ok := err.(*sarama.ProducerError); ok {
+		err = prodError.Err
+	} else if consumerError, ok := err.(*sarama.ConsumerError); ok {
+		err = consumerError.Err
+	}
+
+	// Sarama-Cluster will compose the error into a ClusterError struct,
+	// which we can't do a compare by reference. To handle that, we the
+	// best we can do is compare the error strings.
+
+	switch err.Error() {
+	case context.DeadlineExceeded.Error():
+		log.Info("is-liveness-error-timeout")
+		return true
+	case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
+		log.Info("is-liveness-error-no-brokers")
+		return true
+	case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
+		log.Info("is-liveness-error-shutting-down")
+		return true
+	case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
+		log.Info("is-liveness-error-not-available")
+		return true
+	case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
+		log.Info("is-liveness-error-circuit-breaker-open")
+		return true
+	}
+
+	if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
+		log.Info("is-liveness-error-connection-refused")
+		return true
+	}
+
+	// Other errors shouldn't trigger a loss of liveness
+
+	log.Infow("is-liveness-error-ignored", log.Fields{"err": err})
+
+	return false
+}
+
 // send formats and sends the request onto the kafka messaging bus.
 func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
 
@@ -521,7 +566,7 @@
 		sc.updateLiveness(true)
 	case notOk := <-sc.producer.Errors():
 		log.Debugw("error-sending", log.Fields{"status": notOk})
-		if strings.Contains(notOk.Error(), "Failed to produce") {
+		if sc.isLivenessError(notOk) {
 			sc.updateLiveness(false)
 		}
 		return notOk
@@ -577,7 +622,7 @@
 		sc.updateLiveness(true)
 	case notOk := <-sc.producer.Errors():
 		log.Debugw("liveness-error-sending", log.Fields{"status": notOk})
-		if strings.Contains(notOk.Error(), "Failed to produce") {
+		if sc.isLivenessError(notOk) {
 			sc.updateLiveness(false)
 		}
 		return notOk
@@ -896,7 +941,9 @@
 		select {
 		case err, ok := <-consumer.Errors():
 			if ok {
-				sc.updateLiveness(false)
+				if sc.isLivenessError(err) {
+					sc.updateLiveness(false)
+				}
 				log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
 			} else {
 				log.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})