VOL-2101 kubernetes liveliness changes
Help text modified
Change-Id: Ia824436d6c9c06710930dfbca1528c6366e1ef64
diff --git a/config/config.go b/config/config.go
index ba93d87..d575231 100644
--- a/config/config.go
+++ b/config/config.go
@@ -1,17 +1,17 @@
 /*
- * Copyright 2018-present Open Networking Foundation
+* Copyright 2018-present Open Networking Foundation
 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
 
- * http://www.apache.org/licenses/LICENSE-2.0
+* http://www.apache.org/licenses/LICENSE-2.0
 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
  */
 
 //Package config provides the Log, kvstore, Kafka configuration
@@ -22,52 +22,57 @@
 	"fmt"
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
 	"os"
+	"time"
 )
 
 // Open OLT default constants
 const (
-	EtcdStoreName             = "etcd"
-	defaultInstanceid         = "openOlt001"
-	defaultKafkaadapterhost   = "127.0.0.1"
-	defaultKafkaadapterport   = 9092
-	defaultKafkaclusterhost   = "127.0.0.1"
-	defaultKafkaclusterport   = 9094
-	defaultKvstoretype        = EtcdStoreName
-	defaultKvstoretimeout     = 5 //in seconds
-	defaultKvstorehost        = "127.0.0.1"
-	defaultKvstoreport        = 2379 // Consul = 8500; Etcd = 2379
-	defaultLoglevel           = 0
-	defaultBanner             = false
-	defaultDisplayVersionOnly = false
-	defaultTopic              = "openolt"
-	defaultCoretopic          = "rwcore"
-	defaultEventtopic         = "voltha.events"
-	defaultOnunumber          = 1
-	defaultProbeHost          = ""
-	defaultProbePort          = 8080
+	EtcdStoreName               = "etcd"
+	defaultInstanceid           = "openOlt001"
+	defaultKafkaadapterhost     = "127.0.0.1"
+	defaultKafkaadapterport     = 9092
+	defaultKafkaclusterhost     = "127.0.0.1"
+	defaultKafkaclusterport     = 9094
+	defaultKvstoretype          = EtcdStoreName
+	defaultKvstoretimeout       = 5 //in seconds
+	defaultKvstorehost          = "127.0.0.1"
+	defaultKvstoreport          = 2379 // Consul = 8500; Etcd = 2379
+	defaultLoglevel             = 0
+	defaultBanner               = false
+	defaultDisplayVersionOnly   = false
+	defaultTopic                = "openolt"
+	defaultCoretopic            = "rwcore"
+	defaultEventtopic           = "voltha.events"
+	defaultOnunumber            = 1
+	defaultProbeHost            = ""
+	defaultProbePort            = 8080
+	defaultLiveProbeInterval    = 60 * time.Second
+	defaultNotLiveProbeInterval = 5 * time.Second // Probe more frequently when not alive
 )
 
 // AdapterFlags represents the set of configurations used by the read-write adaptercore service
 type AdapterFlags struct {
 	// Command line parameters
-	InstanceID         string
-	KafkaAdapterHost   string
-	KafkaAdapterPort   int
-	KafkaClusterHost   string
-	KafkaClusterPort   int
-	KVStoreType        string
-	KVStoreTimeout     int // in seconds
-	KVStoreHost        string
-	KVStorePort        int
-	Topic              string
-	CoreTopic          string
-	EventTopic         string
-	LogLevel           int
-	OnuNumber          int
-	Banner             bool
-	DisplayVersionOnly bool
-	ProbeHost          string
-	ProbePort          int
+	InstanceID           string
+	KafkaAdapterHost     string
+	KafkaAdapterPort     int
+	KafkaClusterHost     string
+	KafkaClusterPort     int
+	KVStoreType          string
+	KVStoreTimeout       int // in seconds
+	KVStoreHost          string
+	KVStorePort          int
+	Topic                string
+	CoreTopic            string
+	EventTopic           string
+	LogLevel             int
+	OnuNumber            int
+	Banner               bool
+	DisplayVersionOnly   bool
+	ProbeHost            string
+	ProbePort            int
+	LiveProbeInterval    time.Duration
+	NotLiveProbeInterval time.Duration
 }
 
 func init() {
@@ -77,24 +82,26 @@
 // NewAdapterFlags returns a new RWCore config
 func NewAdapterFlags() *AdapterFlags {
 	var adapterFlags = AdapterFlags{ // Default values
-		InstanceID:         defaultInstanceid,
-		KafkaAdapterHost:   defaultKafkaadapterhost,
-		KafkaAdapterPort:   defaultKafkaadapterport,
-		KafkaClusterHost:   defaultKafkaclusterhost,
-		KafkaClusterPort:   defaultKafkaclusterport,
-		KVStoreType:        defaultKvstoretype,
-		KVStoreTimeout:     defaultKvstoretimeout,
-		KVStoreHost:        defaultKvstorehost,
-		KVStorePort:        defaultKvstoreport,
-		Topic:              defaultTopic,
-		CoreTopic:          defaultCoretopic,
-		EventTopic:         defaultEventtopic,
-		LogLevel:           defaultLoglevel,
-		OnuNumber:          defaultOnunumber,
-		Banner:             defaultBanner,
-		DisplayVersionOnly: defaultDisplayVersionOnly,
-		ProbeHost:          defaultProbeHost,
-		ProbePort:          defaultProbePort,
+		InstanceID:           defaultInstanceid,
+		KafkaAdapterHost:     defaultKafkaadapterhost,
+		KafkaAdapterPort:     defaultKafkaadapterport,
+		KafkaClusterHost:     defaultKafkaclusterhost,
+		KafkaClusterPort:     defaultKafkaclusterport,
+		KVStoreType:          defaultKvstoretype,
+		KVStoreTimeout:       defaultKvstoretimeout,
+		KVStoreHost:          defaultKvstorehost,
+		KVStorePort:          defaultKvstoreport,
+		Topic:                defaultTopic,
+		CoreTopic:            defaultCoretopic,
+		EventTopic:           defaultEventtopic,
+		LogLevel:             defaultLoglevel,
+		OnuNumber:            defaultOnunumber,
+		Banner:               defaultBanner,
+		DisplayVersionOnly:   defaultDisplayVersionOnly,
+		ProbeHost:            defaultProbeHost,
+		ProbePort:            defaultProbePort,
+		LiveProbeInterval:    defaultLiveProbeInterval,
+		NotLiveProbeInterval: defaultNotLiveProbeInterval,
 	}
 	return &adapterFlags
 }
@@ -153,6 +160,12 @@
 	help = fmt.Sprintf("The port on which to listen to answer liveness and readiness probe queries over HTTP.")
 	flag.IntVar(&(so.ProbePort), "probe_port", defaultProbePort, help)
 
+	help = fmt.Sprintf("Number of seconds for the default liveliness check")
+	flag.DurationVar(&(so.LiveProbeInterval), "live_probe_interval", defaultLiveProbeInterval, help)
+
+	help = fmt.Sprintf("Number of seconds for liveliness check if probe is not running")
+	flag.DurationVar(&(so.NotLiveProbeInterval), "not_live_probe_interval", defaultNotLiveProbeInterval, help)
+
 	flag.Parse()
 
 	containerName := getContainerInfo()
diff --git a/go.mod b/go.mod
index 39e0de0..92454eb 100644
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,7 @@
 require (
 	github.com/gogo/protobuf v1.3.1
 	github.com/golang/protobuf v1.3.2
-	github.com/opencord/voltha-lib-go/v2 v2.2.19
+	github.com/opencord/voltha-lib-go/v2 v2.2.20
 	github.com/opencord/voltha-protos/v2 v2.1.2
 	go.etcd.io/etcd v0.0.0-20190930204107-236ac2a90522
 	google.golang.org/grpc v1.25.1
diff --git a/go.sum b/go.sum
index b23cb51..9595ab8 100644
--- a/go.sum
+++ b/go.sum
@@ -192,8 +192,8 @@
 github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
 github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/opencord/voltha-lib-go/v2 v2.2.19 h1:t5feQyT+/EmE1nuS1uRIDKLITvaLIaBtX6NsMTfVxs8=
-github.com/opencord/voltha-lib-go/v2 v2.2.19/go.mod h1:CoY2amUEsbO2grCbJRk7G+Fl1Xb7vQLw3/uGLbTz0Ms=
+github.com/opencord/voltha-lib-go/v2 v2.2.20 h1:Vo1TjjlErN2LZmknn0XS+dsIrjulRv/fFa5r7Y8mhjE=
+github.com/opencord/voltha-lib-go/v2 v2.2.20/go.mod h1:CoY2amUEsbO2grCbJRk7G+Fl1Xb7vQLw3/uGLbTz0Ms=
 github.com/opencord/voltha-protos/v2 v2.1.0/go.mod h1:6kOcfYi1CadWowFxI2SH5wLfHrsRECZLZlD2MFK6WDI=
 github.com/opencord/voltha-protos/v2 v2.1.2 h1:/eX+kXhANbzxTpBHgC6vjwBUGRKKvGUOQRDdDgROp9E=
 github.com/opencord/voltha-protos/v2 v2.1.2/go.mod h1:6kOcfYi1CadWowFxI2SH5wLfHrsRECZLZlD2MFK6WDI=
diff --git a/main.go b/main.go
index 2ffa534..b9bff5f 100644
--- a/main.go
+++ b/main.go
@@ -1,17 +1,17 @@
 /*
- * Copyright 2018-present Open Networking Foundation
+* Copyright 2018-present Open Networking Foundation
 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
 
- * http://www.apache.org/licenses/LICENSE-2.0
+* http://www.apache.org/licenses/LICENSE-2.0
 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
  */
 
 //Package main invokes the application
@@ -61,13 +61,6 @@
 	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
 }
 
-const (
-	// DefaultLivelinessCheck to check the liveliness
-	DefaultLivelinessCheck = 30 * time.Second
-	// DefaultTimeout to close the connection
-	DefaultTimeout = 10
-)
-
 func newAdapter(cf *config.AdapterFlags) *adapter {
 	var a adapter
 	a.instanceID = cf.InstanceID
@@ -146,35 +139,97 @@
 		log.Fatal("error-registering-with-core")
 	}
 
-	// go routine to check the readiness and liveliness and update the probe status
-	go a.checKServicesReadiness(ctx)
+	// check the readiness and liveliness and update the probe status
+	a.checkServicesReadiness(ctx)
 }
 
 /**
 This function checks the liveliness and readiness of the kakfa and kv-client services
 and update the status in the probe.
 */
-func (a *adapter) checKServicesReadiness(ctx context.Context) {
+func (a *adapter) checkServicesReadiness(ctx context.Context) {
+	// checks the kafka readiness
+	go a.checkKafkaReadiness(ctx)
+
+	// checks the kv-store readiness
+	go a.checkKvStoreReadiness(ctx)
+}
+
+/**
+This function checks the liveliness and readiness of the kv-store service
+and update the status in the probe.
+*/
+func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
+	// dividing the live probe interval by 2 to get updated status every 30s
+	timeout := a.config.LiveProbeInterval / 2
+	kvStoreChannel := make(chan bool, 1)
+
+	// Default false to check the liveliness.
+	kvStoreChannel <- false
 	for {
-		// checks the connectivity of the kv-client
-		if a.kvClient.IsConnectionUp(DefaultTimeout) {
-			probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
-			log.Debugf("kv- store is reachable")
-		} else {
-			probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
-			log.Debugf("kv-store is  not reachable")
+		timeoutTimer := time.NewTimer(timeout)
+		select {
+		case liveliness := <-kvStoreChannel:
+			if !liveliness {
+				// kv-store not reachable or down, updating the status to not ready state
+				probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
+				timeout = a.config.NotLiveProbeInterval
+			} else {
+				// kv-store is reachable , updating the status to running state
+				probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
+				timeout = a.config.LiveProbeInterval / 2
+			}
+			// Check if the timer has expired or not
+			if !timeoutTimer.Stop() {
+				<-timeoutTimer.C
+			}
+		case <-timeoutTimer.C:
+			// Check the status of the kv-store
+			log.Info("kv-store liveliness-recheck")
+			if a.kvClient.IsConnectionUp(a.config.KVStoreTimeout) {
+				kvStoreChannel <- true
+			} else {
+				kvStoreChannel <- false
+			}
 		}
-		//checks the connectivity of the kafka
-		a.kafkaClient.SendLiveness()
-		isKafkaReady := <-a.kafkaClient.EnableLivenessChannel(true)
-		if isKafkaReady {
-			probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
-			log.Debugf("kafka is reachable")
-		} else {
-			probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
-			log.Debugf("kafka is  not reachable")
+	}
+}
+
+/**
+This function checks the liveliness and readiness of the kafka service
+and update the status in the probe.
+*/
+func (a *adapter) checkKafkaReadiness(ctx context.Context) {
+	livelinessChannel := a.kafkaClient.EnableLivenessChannel(true)
+	timeout := a.config.LiveProbeInterval
+	for {
+		timeoutTimer := time.NewTimer(timeout)
+
+		select {
+		case liveliness := <-livelinessChannel:
+			if !liveliness {
+				// kafka not reachable or down, updating the status to not ready state
+				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
+				timeout = a.config.NotLiveProbeInterval
+			} else {
+				// kafka is reachable , updating the status to running state
+				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
+				timeout = a.config.LiveProbeInterval
+			}
+			// Check if the timer has expired or not
+			if !timeoutTimer.Stop() {
+				<-timeoutTimer.C
+			}
+		case <-timeoutTimer.C:
+			log.Info("kafka-proxy-liveness-recheck")
+			// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
+			// the liveness probe may wait (and block) writing to our channel.
+			err := a.kafkaClient.SendLiveness()
+			if err != nil {
+				// Catch possible error case if sending liveness after Sarama has been stopped.
+				log.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
+			}
 		}
-		time.Sleep(DefaultLivelinessCheck)
 	}
 }
 
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go
index 73025d9..ff521a7 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go
@@ -907,7 +907,10 @@
 		select {
 		case err, ok := <-consumer.Errors():
 			if ok {
-				log.Warnw("partition-consumers-error", log.Fields{"error": err})
+				if sc.isLivenessError(err) {
+					sc.updateLiveness(false)
+					log.Warnw("partition-consumers-error", log.Fields{"error": err})
+				}
 			} else {
 				// Channel is closed
 				break startloop
@@ -919,6 +922,8 @@
 				break startloop
 			}
 			msgBody := msg.Value
+			sc.updateLiveness(true)
+			log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
 			icm := &ic.InterContainerMessage{}
 			if err := proto.Unmarshal(msgBody, icm); err != nil {
 				log.Warnw("partition-invalid-message", log.Fields{"error": err})
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 237ca43..a43764b 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -59,7 +59,7 @@
 github.com/mitchellh/go-homedir
 # github.com/mitchellh/mapstructure v1.1.2
 github.com/mitchellh/mapstructure
-# github.com/opencord/voltha-lib-go/v2 v2.2.19
+# github.com/opencord/voltha-lib-go/v2 v2.2.20
 github.com/opencord/voltha-lib-go/v2/pkg/adapters
 github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif
 github.com/opencord/voltha-lib-go/v2/pkg/adapters/common