VOL-2101 kubernetes liveliness changes
Help text modified
Change-Id: Ia824436d6c9c06710930dfbca1528c6366e1ef64
diff --git a/config/config.go b/config/config.go
index ba93d87..d575231 100644
--- a/config/config.go
+++ b/config/config.go
@@ -1,17 +1,17 @@
/*
- * Copyright 2018-present Open Networking Foundation
+* Copyright 2018-present Open Networking Foundation
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
+* http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
*/
//Package config provides the Log, kvstore, Kafka configuration
@@ -22,52 +22,57 @@
"fmt"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
"os"
+ "time"
)
// Open OLT default constants
const (
- EtcdStoreName = "etcd"
- defaultInstanceid = "openOlt001"
- defaultKafkaadapterhost = "127.0.0.1"
- defaultKafkaadapterport = 9092
- defaultKafkaclusterhost = "127.0.0.1"
- defaultKafkaclusterport = 9094
- defaultKvstoretype = EtcdStoreName
- defaultKvstoretimeout = 5 //in seconds
- defaultKvstorehost = "127.0.0.1"
- defaultKvstoreport = 2379 // Consul = 8500; Etcd = 2379
- defaultLoglevel = 0
- defaultBanner = false
- defaultDisplayVersionOnly = false
- defaultTopic = "openolt"
- defaultCoretopic = "rwcore"
- defaultEventtopic = "voltha.events"
- defaultOnunumber = 1
- defaultProbeHost = ""
- defaultProbePort = 8080
+ EtcdStoreName = "etcd"
+ defaultInstanceid = "openOlt001"
+ defaultKafkaadapterhost = "127.0.0.1"
+ defaultKafkaadapterport = 9092
+ defaultKafkaclusterhost = "127.0.0.1"
+ defaultKafkaclusterport = 9094
+ defaultKvstoretype = EtcdStoreName
+ defaultKvstoretimeout = 5 //in seconds
+ defaultKvstorehost = "127.0.0.1"
+ defaultKvstoreport = 2379 // Consul = 8500; Etcd = 2379
+ defaultLoglevel = 0
+ defaultBanner = false
+ defaultDisplayVersionOnly = false
+ defaultTopic = "openolt"
+ defaultCoretopic = "rwcore"
+ defaultEventtopic = "voltha.events"
+ defaultOnunumber = 1
+ defaultProbeHost = ""
+ defaultProbePort = 8080
+ defaultLiveProbeInterval = 60 * time.Second
+ defaultNotLiveProbeInterval = 5 * time.Second // Probe more frequently when not alive
)
// AdapterFlags represents the set of configurations used by the read-write adaptercore service
type AdapterFlags struct {
// Command line parameters
- InstanceID string
- KafkaAdapterHost string
- KafkaAdapterPort int
- KafkaClusterHost string
- KafkaClusterPort int
- KVStoreType string
- KVStoreTimeout int // in seconds
- KVStoreHost string
- KVStorePort int
- Topic string
- CoreTopic string
- EventTopic string
- LogLevel int
- OnuNumber int
- Banner bool
- DisplayVersionOnly bool
- ProbeHost string
- ProbePort int
+ InstanceID string
+ KafkaAdapterHost string
+ KafkaAdapterPort int
+ KafkaClusterHost string
+ KafkaClusterPort int
+ KVStoreType string
+ KVStoreTimeout int // in seconds
+ KVStoreHost string
+ KVStorePort int
+ Topic string
+ CoreTopic string
+ EventTopic string
+ LogLevel int
+ OnuNumber int
+ Banner bool
+ DisplayVersionOnly bool
+ ProbeHost string
+ ProbePort int
+ LiveProbeInterval time.Duration
+ NotLiveProbeInterval time.Duration
}
func init() {
@@ -77,24 +82,26 @@
// NewAdapterFlags returns a new RWCore config
func NewAdapterFlags() *AdapterFlags {
var adapterFlags = AdapterFlags{ // Default values
- InstanceID: defaultInstanceid,
- KafkaAdapterHost: defaultKafkaadapterhost,
- KafkaAdapterPort: defaultKafkaadapterport,
- KafkaClusterHost: defaultKafkaclusterhost,
- KafkaClusterPort: defaultKafkaclusterport,
- KVStoreType: defaultKvstoretype,
- KVStoreTimeout: defaultKvstoretimeout,
- KVStoreHost: defaultKvstorehost,
- KVStorePort: defaultKvstoreport,
- Topic: defaultTopic,
- CoreTopic: defaultCoretopic,
- EventTopic: defaultEventtopic,
- LogLevel: defaultLoglevel,
- OnuNumber: defaultOnunumber,
- Banner: defaultBanner,
- DisplayVersionOnly: defaultDisplayVersionOnly,
- ProbeHost: defaultProbeHost,
- ProbePort: defaultProbePort,
+ InstanceID: defaultInstanceid,
+ KafkaAdapterHost: defaultKafkaadapterhost,
+ KafkaAdapterPort: defaultKafkaadapterport,
+ KafkaClusterHost: defaultKafkaclusterhost,
+ KafkaClusterPort: defaultKafkaclusterport,
+ KVStoreType: defaultKvstoretype,
+ KVStoreTimeout: defaultKvstoretimeout,
+ KVStoreHost: defaultKvstorehost,
+ KVStorePort: defaultKvstoreport,
+ Topic: defaultTopic,
+ CoreTopic: defaultCoretopic,
+ EventTopic: defaultEventtopic,
+ LogLevel: defaultLoglevel,
+ OnuNumber: defaultOnunumber,
+ Banner: defaultBanner,
+ DisplayVersionOnly: defaultDisplayVersionOnly,
+ ProbeHost: defaultProbeHost,
+ ProbePort: defaultProbePort,
+ LiveProbeInterval: defaultLiveProbeInterval,
+ NotLiveProbeInterval: defaultNotLiveProbeInterval,
}
return &adapterFlags
}
@@ -153,6 +160,12 @@
help = fmt.Sprintf("The port on which to listen to answer liveness and readiness probe queries over HTTP.")
flag.IntVar(&(so.ProbePort), "probe_port", defaultProbePort, help)
+ help = fmt.Sprintf("Number of seconds for the default liveliness check")
+ flag.DurationVar(&(so.LiveProbeInterval), "live_probe_interval", defaultLiveProbeInterval, help)
+
+ help = fmt.Sprintf("Number of seconds for liveliness check if probe is not running")
+ flag.DurationVar(&(so.NotLiveProbeInterval), "not_live_probe_interval", defaultNotLiveProbeInterval, help)
+
flag.Parse()
containerName := getContainerInfo()
diff --git a/go.mod b/go.mod
index 39e0de0..92454eb 100644
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,7 @@
require (
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.3.2
- github.com/opencord/voltha-lib-go/v2 v2.2.19
+ github.com/opencord/voltha-lib-go/v2 v2.2.20
github.com/opencord/voltha-protos/v2 v2.1.2
go.etcd.io/etcd v0.0.0-20190930204107-236ac2a90522
google.golang.org/grpc v1.25.1
diff --git a/go.sum b/go.sum
index b23cb51..9595ab8 100644
--- a/go.sum
+++ b/go.sum
@@ -192,8 +192,8 @@
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/opencord/voltha-lib-go/v2 v2.2.19 h1:t5feQyT+/EmE1nuS1uRIDKLITvaLIaBtX6NsMTfVxs8=
-github.com/opencord/voltha-lib-go/v2 v2.2.19/go.mod h1:CoY2amUEsbO2grCbJRk7G+Fl1Xb7vQLw3/uGLbTz0Ms=
+github.com/opencord/voltha-lib-go/v2 v2.2.20 h1:Vo1TjjlErN2LZmknn0XS+dsIrjulRv/fFa5r7Y8mhjE=
+github.com/opencord/voltha-lib-go/v2 v2.2.20/go.mod h1:CoY2amUEsbO2grCbJRk7G+Fl1Xb7vQLw3/uGLbTz0Ms=
github.com/opencord/voltha-protos/v2 v2.1.0/go.mod h1:6kOcfYi1CadWowFxI2SH5wLfHrsRECZLZlD2MFK6WDI=
github.com/opencord/voltha-protos/v2 v2.1.2 h1:/eX+kXhANbzxTpBHgC6vjwBUGRKKvGUOQRDdDgROp9E=
github.com/opencord/voltha-protos/v2 v2.1.2/go.mod h1:6kOcfYi1CadWowFxI2SH5wLfHrsRECZLZlD2MFK6WDI=
diff --git a/main.go b/main.go
index 2ffa534..b9bff5f 100644
--- a/main.go
+++ b/main.go
@@ -1,17 +1,17 @@
/*
- * Copyright 2018-present Open Networking Foundation
+* Copyright 2018-present Open Networking Foundation
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
+* http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
*/
//Package main invokes the application
@@ -61,13 +61,6 @@
_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
}
-const (
- // DefaultLivelinessCheck to check the liveliness
- DefaultLivelinessCheck = 30 * time.Second
- // DefaultTimeout to close the connection
- DefaultTimeout = 10
-)
-
func newAdapter(cf *config.AdapterFlags) *adapter {
var a adapter
a.instanceID = cf.InstanceID
@@ -146,35 +139,97 @@
log.Fatal("error-registering-with-core")
}
- // go routine to check the readiness and liveliness and update the probe status
- go a.checKServicesReadiness(ctx)
+ // check the readiness and liveliness and update the probe status
+ a.checkServicesReadiness(ctx)
}
/**
This function checks the liveliness and readiness of the kakfa and kv-client services
and update the status in the probe.
*/
-func (a *adapter) checKServicesReadiness(ctx context.Context) {
+func (a *adapter) checkServicesReadiness(ctx context.Context) {
+ // checks the kafka readiness
+ go a.checkKafkaReadiness(ctx)
+
+ // checks the kv-store readiness
+ go a.checkKvStoreReadiness(ctx)
+}
+
+/**
+This function checks the liveliness and readiness of the kv-store service
+and update the status in the probe.
+*/
+func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
+ // dividing the live probe interval by 2 to get updated status every 30s
+ timeout := a.config.LiveProbeInterval / 2
+ kvStoreChannel := make(chan bool, 1)
+
+ // Default false to check the liveliness.
+ kvStoreChannel <- false
for {
- // checks the connectivity of the kv-client
- if a.kvClient.IsConnectionUp(DefaultTimeout) {
- probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
- log.Debugf("kv- store is reachable")
- } else {
- probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
- log.Debugf("kv-store is not reachable")
+ timeoutTimer := time.NewTimer(timeout)
+ select {
+ case liveliness := <-kvStoreChannel:
+ if !liveliness {
+ // kv-store not reachable or down, updating the status to not ready state
+ probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
+ timeout = a.config.NotLiveProbeInterval
+ } else {
+ // kv-store is reachable , updating the status to running state
+ probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
+ timeout = a.config.LiveProbeInterval / 2
+ }
+ // Check if the timer has expired or not
+ if !timeoutTimer.Stop() {
+ <-timeoutTimer.C
+ }
+ case <-timeoutTimer.C:
+ // Check the status of the kv-store
+ log.Info("kv-store liveliness-recheck")
+ if a.kvClient.IsConnectionUp(a.config.KVStoreTimeout) {
+ kvStoreChannel <- true
+ } else {
+ kvStoreChannel <- false
+ }
}
- //checks the connectivity of the kafka
- a.kafkaClient.SendLiveness()
- isKafkaReady := <-a.kafkaClient.EnableLivenessChannel(true)
- if isKafkaReady {
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
- log.Debugf("kafka is reachable")
- } else {
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
- log.Debugf("kafka is not reachable")
+ }
+}
+
+/**
+This function checks the liveliness and readiness of the kafka service
+and update the status in the probe.
+*/
+func (a *adapter) checkKafkaReadiness(ctx context.Context) {
+ livelinessChannel := a.kafkaClient.EnableLivenessChannel(true)
+ timeout := a.config.LiveProbeInterval
+ for {
+ timeoutTimer := time.NewTimer(timeout)
+
+ select {
+ case liveliness := <-livelinessChannel:
+ if !liveliness {
+ // kafka not reachable or down, updating the status to not ready state
+ probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
+ timeout = a.config.NotLiveProbeInterval
+ } else {
+ // kafka is reachable , updating the status to running state
+ probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
+ timeout = a.config.LiveProbeInterval
+ }
+ // Check if the timer has expired or not
+ if !timeoutTimer.Stop() {
+ <-timeoutTimer.C
+ }
+ case <-timeoutTimer.C:
+ log.Info("kafka-proxy-liveness-recheck")
+ // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
+ // the liveness probe may wait (and block) writing to our channel.
+ err := a.kafkaClient.SendLiveness()
+ if err != nil {
+ // Catch possible error case if sending liveness after Sarama has been stopped.
+ log.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
+ }
}
- time.Sleep(DefaultLivelinessCheck)
}
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go
index 73025d9..ff521a7 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go
@@ -907,7 +907,10 @@
select {
case err, ok := <-consumer.Errors():
if ok {
- log.Warnw("partition-consumers-error", log.Fields{"error": err})
+ if sc.isLivenessError(err) {
+ sc.updateLiveness(false)
+ log.Warnw("partition-consumers-error", log.Fields{"error": err})
+ }
} else {
// Channel is closed
break startloop
@@ -919,6 +922,8 @@
break startloop
}
msgBody := msg.Value
+ sc.updateLiveness(true)
+ log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
icm := &ic.InterContainerMessage{}
if err := proto.Unmarshal(msgBody, icm); err != nil {
log.Warnw("partition-invalid-message", log.Fields{"error": err})
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 237ca43..a43764b 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -59,7 +59,7 @@
github.com/mitchellh/go-homedir
# github.com/mitchellh/mapstructure v1.1.2
github.com/mitchellh/mapstructure
-# github.com/opencord/voltha-lib-go/v2 v2.2.19
+# github.com/opencord/voltha-lib-go/v2 v2.2.20
github.com/opencord/voltha-lib-go/v2/pkg/adapters
github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif
github.com/opencord/voltha-lib-go/v2/pkg/adapters/common