VOL-2101 readiness and liveliness check added to message-bus and etcd
updated vendor folder
Change-Id: I0c1b983bf47e2740e0a24ea0c3a05a803caff15c
updated testcases
diff --git a/go.mod b/go.mod
index c57d9fa..dccec49 100644
--- a/go.mod
+++ b/go.mod
@@ -7,7 +7,7 @@
github.com/golang/protobuf v1.3.2
github.com/onsi/ginkgo v1.8.0 // indirect
github.com/onsi/gomega v1.5.0 // indirect
- github.com/opencord/voltha-lib-go/v2 v2.2.10
+ github.com/opencord/voltha-lib-go/v2 v2.2.13
github.com/opencord/voltha-protos/v2 v2.0.1
go.etcd.io/etcd v0.0.0-20190930204107-236ac2a90522
google.golang.org/grpc v1.24.0
diff --git a/go.sum b/go.sum
index 1631883..3cfac10 100644
--- a/go.sum
+++ b/go.sum
@@ -190,8 +190,8 @@
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/opencord/voltha-lib-go/v2 v2.2.10 h1:BbTkuMnIhz1Uwb6oCQIpQFd0ppH12hqxO5ZZWR+eGog=
-github.com/opencord/voltha-lib-go/v2 v2.2.10/go.mod h1:Hql0xWiBFqYM6WpE5G+w9//NdaIoR9mVzcvVYDxEnZY=
+github.com/opencord/voltha-lib-go/v2 v2.2.13 h1:8TxMjhqOL2vcDxO5uIaAd6Lj2Ahq/gAFiNNoUkM90cQ=
+github.com/opencord/voltha-lib-go/v2 v2.2.13/go.mod h1:g8WH4WTOJ0f40ZoqYFR4nyOLIAC84dOKDXsbT1ZErY4=
github.com/opencord/voltha-protos/v2 v2.0.1 h1:vcE0XxNVeu0SED0bW2lf2w24k/QMFrFqMexuedIyTEg=
github.com/opencord/voltha-protos/v2 v2.0.1/go.mod h1:6kOcfYi1CadWowFxI2SH5wLfHrsRECZLZlD2MFK6WDI=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
diff --git a/main.go b/main.go
index 921c3df..2ffa534 100644
--- a/main.go
+++ b/main.go
@@ -61,6 +61,13 @@
_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
}
+const (
+ // DefaultLivelinessCheck to check the liveliness
+ DefaultLivelinessCheck = 30 * time.Second
+ // DefaultTimeout to close the connection
+ DefaultTimeout = 10
+)
+
func newAdapter(cf *config.AdapterFlags) *adapter {
var a adapter
a.instanceID = cf.InstanceID
@@ -138,6 +145,37 @@
if err = a.registerWithCore(ctx, -1); err != nil {
log.Fatal("error-registering-with-core")
}
+
+ // go routine to check the readiness and liveliness and update the probe status
+ go a.checKServicesReadiness(ctx)
+}
+
+/**
+This function checks the liveliness and readiness of the kakfa and kv-client services
+and update the status in the probe.
+*/
+func (a *adapter) checKServicesReadiness(ctx context.Context) {
+ for {
+ // checks the connectivity of the kv-client
+ if a.kvClient.IsConnectionUp(DefaultTimeout) {
+ probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
+ log.Debugf("kv- store is reachable")
+ } else {
+ probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
+ log.Debugf("kv-store is not reachable")
+ }
+ //checks the connectivity of the kafka
+ a.kafkaClient.SendLiveness()
+ isKafkaReady := <-a.kafkaClient.EnableLivenessChannel(true)
+ if isKafkaReady {
+ probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
+ log.Debugf("kafka is reachable")
+ } else {
+ probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
+ log.Debugf("kafka is not reachable")
+ }
+ time.Sleep(DefaultLivelinessCheck)
+ }
}
func (a *adapter) stop() {
diff --git a/main_test.go b/main_test.go
index 1192ff6..362a135 100644
--- a/main_test.go
+++ b/main_test.go
@@ -18,6 +18,8 @@
import (
"context"
"errors"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
"testing"
"github.com/opencord/voltha-lib-go/v2/pkg/kafka"
@@ -216,3 +218,11 @@
}
return errors.New("invalid topic")
}
+
+func (kc *mockKafkaClient) SendLiveness() error {
+ return status.Error(codes.Unimplemented, "SendLiveness")
+}
+
+func (kc *mockKafkaClient) EnableLivenessChannel(enable bool) chan bool {
+ return nil
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/etcdclient.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/etcdclient.go
index e8bc92c..3ae767c 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/etcdclient.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/etcdclient.go
@@ -38,6 +38,9 @@
lockToMutexLock sync.Mutex
}
+// Connection Timeout in Seconds
+var connTimeout int = 2
+
// NewEtcdClient returns a new client for the Etcd KV store
func NewEtcdClient(addr string, timeout int) (*EtcdClient, error) {
duration := GetDuration(timeout)
@@ -188,8 +191,13 @@
return nil, fmt.Errorf("unexpected-type%T", value)
}
+ duration := GetDuration(connTimeout)
+
// Create a lease
- resp, err := c.ectdAPI.Grant(context.Background(), ttl)
+ ctx, cancel := context.WithTimeout(context.Background(), duration)
+ defer cancel()
+
+ resp, err := c.ectdAPI.Grant(ctx, ttl)
if err != nil {
log.Error(err)
return nil, err
@@ -255,8 +263,12 @@
func (c *EtcdClient) ReleaseAllReservations() error {
c.writeLock.Lock()
defer c.writeLock.Unlock()
+ duration := GetDuration(connTimeout)
+ ctx, cancel := context.WithTimeout(context.Background(), duration)
+ defer cancel()
+
for key, leaseID := range c.keyReservations {
- _, err := c.ectdAPI.Revoke(context.Background(), *leaseID)
+ _, err := c.ectdAPI.Revoke(ctx, *leaseID)
if err != nil {
log.Errorw("cannot-release-reservation", log.Fields{"key": key, "error": err})
return err
@@ -277,8 +289,12 @@
if leaseID, ok = c.keyReservations[key]; !ok {
return nil
}
+ duration := GetDuration(connTimeout)
+ ctx, cancel := context.WithTimeout(context.Background(), duration)
+ defer cancel()
+
if leaseID != nil {
- _, err := c.ectdAPI.Revoke(context.Background(), *leaseID)
+ _, err := c.ectdAPI.Revoke(ctx, *leaseID)
if err != nil {
log.Error(err)
return err
@@ -299,9 +315,12 @@
if leaseID, ok = c.keyReservations[key]; !ok {
return errors.New("key-not-reserved")
}
+ duration := GetDuration(connTimeout)
+ ctx, cancel := context.WithTimeout(context.Background(), duration)
+ defer cancel()
if leaseID != nil {
- _, err := c.ectdAPI.KeepAliveOnce(context.Background(), *leaseID)
+ _, err := c.ectdAPI.KeepAliveOnce(ctx, *leaseID)
if err != nil {
log.Errorw("lease-may-have-expired", log.Fields{"error": err})
return err
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/client.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/client.go
index 007aa74..488bf9f 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/client.go
@@ -55,6 +55,7 @@
DefaultNumberReplicas = 1
DefaultAutoCreateTopic = false
DefaultMetadataMaxRetry = 3
+ DefaultLivenessChannelInterval = time.Second * 30
)
// MsgClient represents the set of APIs a Kafka MsgClient must implement
@@ -66,4 +67,6 @@
Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ca.InterContainerMessage, error)
UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error
Send(msg interface{}, topic *Topic, keys ...string) error
+ SendLiveness() error
+ EnableLivenessChannel(enable bool) chan bool
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/kafka_inter_container_library.go
index c576bc6..3326191 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/kafka_inter_container_library.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/kafka_inter_container_library.go
@@ -764,6 +764,14 @@
return nil
}
+func (kp *InterContainerProxy) EnableLivenessChannel(enable bool) chan bool {
+ return kp.kafkaClient.EnableLivenessChannel(enable)
+}
+
+func (kp *InterContainerProxy) SendLiveness() error {
+ return kp.kafkaClient.SendLiveness()
+}
+
//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
//or an error on failure
func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go
index fc75026..a251c56 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go
@@ -74,6 +74,11 @@
topicLockMap map[string]*sync.RWMutex
lockOfTopicLockMap sync.RWMutex
metadataMaxRetry int
+ alive bool
+ liveness chan bool
+ livenessChannelInterval time.Duration
+ lastLivenessTime time.Time
+ started bool
}
type SaramaClientOption func(*SaramaClient)
@@ -186,6 +191,12 @@
}
}
+func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.livenessChannelInterval = opt
+ }
+}
+
func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
client := &SaramaClient{
KafkaHost: DefaultKafkaHost,
@@ -205,6 +216,7 @@
client.numReplicas = DefaultNumberReplicas
client.autoCreateTopic = DefaultAutoCreateTopic
client.metadataMaxRetry = DefaultMetadataMaxRetry
+ client.livenessChannelInterval = DefaultLivenessChannelInterval
for _, option := range opts {
option(client)
@@ -216,6 +228,10 @@
client.topicLockMap = make(map[string]*sync.RWMutex)
client.lockOfTopicLockMap = sync.RWMutex{}
client.lockOfGroupConsumers = sync.RWMutex{}
+
+ // alive until proven otherwise
+ client.alive = true
+
return client
}
@@ -259,12 +275,16 @@
log.Info("kafka-sarama-client-started")
+ sc.started = true
+
return nil
}
func (sc *SaramaClient) Stop() {
log.Info("stopping-sarama-client")
+ sc.started = false
+
//Send a message over the done channel to close all long running routines
sc.doneCh <- 1
@@ -438,6 +458,30 @@
return err
}
+func (sc *SaramaClient) updateLiveness(alive bool) {
+ // Post a consistent stream of liveness data to the channel,
+ // so that in a live state, the core does not timeout and
+ // send a forced liveness message. Production of liveness
+ // events to the channel is rate-limited by livenessChannelInterval.
+ if sc.liveness != nil {
+ if sc.alive != alive {
+ log.Info("update-liveness-channel-because-change")
+ sc.liveness <- alive
+ sc.lastLivenessTime = time.Now()
+ } else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
+ log.Info("update-liveness-channel-because-interval")
+ sc.liveness <- alive
+ sc.lastLivenessTime = time.Now()
+ }
+ }
+
+ // Only emit a log message when the state changes
+ if sc.alive != alive {
+ log.Info("set-client-alive", log.Fields{"alive": alive})
+ sc.alive = alive
+ }
+}
+
// send formats and sends the request onto the kafka messaging bus.
func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
@@ -474,8 +518,68 @@
select {
case ok := <-sc.producer.Successes():
log.Debugw("message-sent", log.Fields{"status": ok.Topic})
+ sc.updateLiveness(true)
case notOk := <-sc.producer.Errors():
log.Debugw("error-sending", log.Fields{"status": notOk})
+ if strings.Contains(notOk.Error(), "Failed to produce") {
+ sc.updateLiveness(false)
+ }
+ return notOk
+ }
+ return nil
+}
+
+// Enable the liveness monitor channel. This channel will report
+// a "true" or "false" on every publish, which indicates whether
+// or not the channel is still live. This channel is then picked up
+// by the service (i.e. rw_core / ro_core) to update readiness status
+// and/or take other actions.
+func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
+ log.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
+ if enable {
+ if sc.liveness == nil {
+ log.Info("kafka-create-liveness-channel")
+ // At least 1, so we can immediately post to it without blocking
+ // Setting a bigger number (10) allows the monitor to fall behind
+ // without blocking others. The monitor shouldn't really fall
+ // behind...
+ sc.liveness = make(chan bool, 10)
+ // post intial state to the channel
+ sc.liveness <- sc.alive
+ }
+ } else {
+ // TODO: Think about whether we need the ability to turn off
+ // liveness monitoring
+ panic("Turning off liveness reporting is not supported")
+ }
+ return sc.liveness
+}
+
+// send an empty message on the liveness channel to check whether connectivity has
+// been restored.
+func (sc *SaramaClient) SendLiveness() error {
+ if !sc.started {
+ return fmt.Errorf("SendLiveness() called while not started")
+ }
+
+ kafkaMsg := &sarama.ProducerMessage{
+ Topic: "_liveness_test",
+ Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
+ }
+
+ // Send message to kafka
+ sc.producer.Input() <- kafkaMsg
+ // Wait for result
+ // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
+ select {
+ case ok := <-sc.producer.Successes():
+ log.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
+ sc.updateLiveness(true)
+ case notOk := <-sc.producer.Errors():
+ log.Debugw("liveness-error-sending", log.Fields{"status": notOk})
+ if strings.Contains(notOk.Error(), "Failed to produce") {
+ sc.updateLiveness(false)
+ }
return notOk
}
return nil
@@ -713,7 +817,8 @@
config := scc.NewConfig()
config.ClientID = uuid.New().String()
config.Group.Mode = scc.ConsumerModeMultiplex
- //config.Consumer.Return.Errors = true
+ config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
+ config.Consumer.Return.Errors = true
//config.Group.Return.Notifications = false
//config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
//config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
@@ -791,16 +896,20 @@
select {
case err, ok := <-consumer.Errors():
if ok {
+ sc.updateLiveness(false)
log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
} else {
+ log.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
// channel is closed
break startloop
}
case msg, ok := <-consumer.Messages():
if !ok {
+ log.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
// Channel closed
break startloop
}
+ sc.updateLiveness(true)
log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
msgBody := msg.Value
icm := &ic.InterContainerMessage{}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/probe/probe.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/probe/probe.go
index 9823566..7e6dbf9 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/probe/probe.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/probe/probe.go
@@ -47,6 +47,9 @@
// ServiceStatusFailed service has stopped because of an error
ServiceStatusFailed
+
+ // ServiceStatusNotReady service has started but is unable to accept requests
+ ServiceStatusNotReady
)
const (
@@ -71,6 +74,8 @@
return "Stopped"
case ServiceStatusFailed:
return "Failed"
+ case ServiceStatusNotReady:
+ return "NotReady"
}
}
@@ -137,6 +142,13 @@
if p.status == nil {
p.status = make(map[string]ServiceStatus)
}
+
+ // if status hasn't changed, avoid doing useless work
+ existingStatus, ok := p.status[name]
+ if ok && (existingStatus == status) {
+ return
+ }
+
p.status[name] = status
if p.readyFunc != nil {
p.isReady = p.readyFunc(p.status)
@@ -158,17 +170,41 @@
})
}
+func (p *Probe) GetStatus(name string) ServiceStatus {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ if p.status == nil {
+ p.status = make(map[string]ServiceStatus)
+ }
+
+ currentStatus, ok := p.status[name]
+ if ok {
+ return currentStatus
+ }
+
+ return ServiceStatusUnknown
+}
+
+func GetProbeFromContext(ctx context.Context) *Probe {
+ if ctx != nil {
+ if value := ctx.Value(ProbeContextKey); value != nil {
+ if p, ok := value.(*Probe); ok {
+ return p
+ }
+ }
+ }
+ return nil
+}
+
// UpdateStatusFromContext a convenience function to pull the Probe reference from the
// Context, if it exists, and then calling UpdateStatus on that Probe reference. If Context
// is nil or if a Probe reference is not associated with the ProbeContextKey then nothing
// happens
func UpdateStatusFromContext(ctx context.Context, name string, status ServiceStatus) {
- if ctx != nil {
- if value := ctx.Value(ProbeContextKey); value != nil {
- if p, ok := value.(*Probe); ok {
- p.UpdateStatus(name, status)
- }
- }
+ p := GetProbeFromContext(ctx)
+ if p != nil {
+ p.UpdateStatus(name, status)
}
}
@@ -225,6 +261,10 @@
log.Fatal(s.ListenAndServe())
}
+func (p *Probe) IsReady() bool {
+ return p.isReady
+}
+
// defaultReadyFunc if all services are running then ready, else not
func defaultReadyFunc(services map[string]ServiceStatus) bool {
if len(services) == 0 {
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 6888974..c5b5c0f 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -59,7 +59,7 @@
github.com/mitchellh/go-homedir
# github.com/mitchellh/mapstructure v1.1.2
github.com/mitchellh/mapstructure
-# github.com/opencord/voltha-lib-go/v2 v2.2.10
+# github.com/opencord/voltha-lib-go/v2 v2.2.13
github.com/opencord/voltha-lib-go/v2/pkg/adapters
github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif
github.com/opencord/voltha-lib-go/v2/pkg/adapters/common