VOL-2204 Throw fatal error after a consumer stop

Change-Id: I6e87a88ece2f7dc496d263ff0fc501bd8685cc72
diff --git a/go.mod b/go.mod
index ee81f64..321c0db 100644
--- a/go.mod
+++ b/go.mod
@@ -7,7 +7,7 @@
 	github.com/cenkalti/backoff/v3 v3.1.1
 	github.com/gogo/protobuf v1.3.1
 	github.com/golang/protobuf v1.3.2
-	github.com/opencord/voltha-lib-go/v2 v2.2.22
+	github.com/opencord/voltha-lib-go/v2 v2.2.23
 	github.com/opencord/voltha-protos/v2 v2.1.2
 	go.etcd.io/etcd v0.0.0-20190930204107-236ac2a90522
 	google.golang.org/grpc v1.25.1
diff --git a/go.sum b/go.sum
index d4bf8a1..29e55c1 100644
--- a/go.sum
+++ b/go.sum
@@ -196,8 +196,8 @@
 github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
 github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/opencord/voltha-lib-go/v2 v2.2.22 h1:NhRYDzcD3fsq5CL4Xn9BQZM1roJewXmk56R2PwLHfyk=
-github.com/opencord/voltha-lib-go/v2 v2.2.22/go.mod h1:CoY2amUEsbO2grCbJRk7G+Fl1Xb7vQLw3/uGLbTz0Ms=
+github.com/opencord/voltha-lib-go/v2 v2.2.23 h1:iDB6R0aAPeirqHaNAQDtGHtDyb9V/R73wAp2hrbVViQ=
+github.com/opencord/voltha-lib-go/v2 v2.2.23/go.mod h1:CoY2amUEsbO2grCbJRk7G+Fl1Xb7vQLw3/uGLbTz0Ms=
 github.com/opencord/voltha-protos/v2 v2.1.0/go.mod h1:6kOcfYi1CadWowFxI2SH5wLfHrsRECZLZlD2MFK6WDI=
 github.com/opencord/voltha-protos/v2 v2.1.2 h1:/eX+kXhANbzxTpBHgC6vjwBUGRKKvGUOQRDdDgROp9E=
 github.com/opencord/voltha-protos/v2 v2.1.2/go.mod h1:6kOcfYi1CadWowFxI2SH5wLfHrsRECZLZlD2MFK6WDI=
diff --git a/main.go b/main.go
index b9bff5f..0e2e186 100644
--- a/main.go
+++ b/main.go
@@ -201,11 +201,17 @@
 */
 func (a *adapter) checkKafkaReadiness(ctx context.Context) {
 	livelinessChannel := a.kafkaClient.EnableLivenessChannel(true)
+	healthinessChannel := a.kafkaClient.EnableHealthinessChannel(true)
 	timeout := a.config.LiveProbeInterval
 	for {
 		timeoutTimer := time.NewTimer(timeout)
 
 		select {
+		case healthiness := <-healthinessChannel:
+			if !healthiness {
+				// log.Fatal will call os.Exit(1) to terminate
+				log.Fatal("Kafka service has become unhealthy")
+			}
 		case liveliness := <-livelinessChannel:
 			if !liveliness {
 				// kafka not reachable or down, updating the status to not ready state
diff --git a/main_test.go b/main_test.go
index 362a135..f25d132 100644
--- a/main_test.go
+++ b/main_test.go
@@ -226,3 +226,7 @@
 func (kc *mockKafkaClient) EnableLivenessChannel(enable bool) chan bool {
 	return nil
 }
+
+func (kc *mockKafkaClient) EnableHealthinessChannel(enable bool) chan bool {
+	return nil
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/client.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/client.go
index 488bf9f..bda7ed9 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/client.go
@@ -69,4 +69,5 @@
 	Send(msg interface{}, topic *Topic, keys ...string) error
 	SendLiveness() error
 	EnableLivenessChannel(enable bool) chan bool
+	EnableHealthinessChannel(enable bool) chan bool
 }
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/kafka_inter_container_library.go
index 3326191..4e04b30 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/kafka_inter_container_library.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/kafka_inter_container_library.go
@@ -768,6 +768,10 @@
 	return kp.kafkaClient.EnableLivenessChannel(enable)
 }
 
+func (kp *InterContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
+	return kp.kafkaClient.EnableHealthinessChannel(enable)
+}
+
 func (kp *InterContainerProxy) SendLiveness() error {
 	return kp.kafkaClient.SendLiveness()
 }
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go
index ff521a7..ca88dfd 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go
@@ -81,6 +81,8 @@
 	livenessChannelInterval       time.Duration
 	lastLivenessTime              time.Time
 	started                       bool
+	healthy                       bool
+	healthiness                   chan bool
 }
 
 type SaramaClientOption func(*SaramaClient)
@@ -231,8 +233,9 @@
 	client.lockOfTopicLockMap = sync.RWMutex{}
 	client.lockOfGroupConsumers = sync.RWMutex{}
 
-	// alive until proven otherwise
+	// healthy and alive until proven otherwise
 	client.alive = true
+	client.healthy = true
 
 	return client
 }
@@ -484,6 +487,15 @@
 	}
 }
 
+// Once unhealthy, we never go back
+func (sc *SaramaClient) setUnhealthy() {
+	sc.healthy = false
+	if sc.healthiness != nil {
+		log.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
+		sc.healthiness <- sc.healthy
+	}
+}
+
 func (sc *SaramaClient) isLivenessError(err error) bool {
 	// Sarama producers and consumers encapsulate the error inside
 	// a ProducerError or ConsumerError struct.
@@ -600,6 +612,30 @@
 	return sc.liveness
 }
 
+// Enable the Healthiness monitor channel. This channel will report "false"
+// if the kafka consumers die, or some other problem occurs which is
+// catastrophic that would require re-creating the client.
+func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
+	log.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
+	if enable {
+		if sc.healthiness == nil {
+			log.Info("kafka-create-healthiness-channel")
+			// At least 1, so we can immediately post to it without blocking
+			// Setting a bigger number (10) allows the monitor to fall behind
+			// without blocking others. The monitor shouldn't really fall
+			// behind...
+			sc.healthiness = make(chan bool, 10)
+			// post intial state to the channel
+			sc.healthiness <- sc.healthy
+		}
+	} else {
+		// TODO: Think about whether we need the ability to turn off
+		// liveness monitoring
+		panic("Turning off healthiness reporting is not supported")
+	}
+	return sc.healthiness
+}
+
 // send an empty message on the liveness channel to check whether connectivity has
 // been restored.
 func (sc *SaramaClient) SendLiveness() error {
@@ -936,6 +972,7 @@
 		}
 	}
 	log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
+	sc.setUnhealthy()
 }
 
 func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
@@ -979,6 +1016,7 @@
 		}
 	}
 	log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
+	sc.setUnhealthy()
 }
 
 func (sc *SaramaClient) startConsumers(topic *Topic) error {
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 174601f..0b40ae6 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -63,7 +63,7 @@
 github.com/mitchellh/go-homedir
 # github.com/mitchellh/mapstructure v1.1.2
 github.com/mitchellh/mapstructure
-# github.com/opencord/voltha-lib-go/v2 v2.2.22
+# github.com/opencord/voltha-lib-go/v2 v2.2.23
 github.com/opencord/voltha-lib-go/v2/pkg/adapters
 github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif
 github.com/opencord/voltha-lib-go/v2/pkg/adapters/common