VOL-2258 release using voltha-lib-go 2.2.16

Change-Id: I4e5a312fe4374fb423aa6a3e14237c5d9caa9814
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/client.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/client.go
index c0ebe5f..97fbec9 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/client.go
@@ -79,10 +79,10 @@
 
 // Client represents the set of APIs a KV Client must implement
 type Client interface {
-	List(key string, timeout int, lock ...bool) (map[string]*KVPair, error)
-	Get(key string, timeout int, lock ...bool) (*KVPair, error)
-	Put(key string, value interface{}, timeout int, lock ...bool) error
-	Delete(key string, timeout int, lock ...bool) error
+	List(key string, timeout int) (map[string]*KVPair, error)
+	Get(key string, timeout int) (*KVPair, error)
+	Put(key string, value interface{}, timeout int) error
+	Delete(key string, timeout int) error
 	Reserve(key string, value interface{}, ttl int64) (interface{}, error)
 	ReleaseReservation(key string) error
 	ReleaseAllReservations() error
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/consulclient.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/consulclient.go
index e0e8550..a94de4d 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/consulclient.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/consulclient.go
@@ -71,7 +71,7 @@
 
 // List returns an array of key-value pairs with key as a prefix.  Timeout defines how long the function will
 // wait for a response
-func (c *ConsulClient) List(key string, timeout int, lock ...bool) (map[string]*KVPair, error) {
+func (c *ConsulClient) List(key string, timeout int) (map[string]*KVPair, error) {
 	duration := GetDuration(timeout)
 
 	kv := c.consul.KV()
@@ -92,7 +92,7 @@
 
 // Get returns a key-value pair for a given key. Timeout defines how long the function will
 // wait for a response
-func (c *ConsulClient) Get(key string, timeout int, lock ...bool) (*KVPair, error) {
+func (c *ConsulClient) Get(key string, timeout int) (*KVPair, error) {
 
 	duration := GetDuration(timeout)
 
@@ -115,7 +115,7 @@
 // Put writes a key-value pair to the KV store.  Value can only be a string or []byte since the consul API
 // accepts only a []byte as a value for a put operation. Timeout defines how long the function will
 // wait for a response
-func (c *ConsulClient) Put(key string, value interface{}, timeout int, lock ...bool) error {
+func (c *ConsulClient) Put(key string, value interface{}, timeout int) error {
 
 	// Validate that we can create a byte array from the value as consul API expects a byte array
 	var val []byte
@@ -141,7 +141,7 @@
 
 // Delete removes a key from the KV store. Timeout defines how long the function will
 // wait for a response
-func (c *ConsulClient) Delete(key string, timeout int, lock ...bool) error {
+func (c *ConsulClient) Delete(key string, timeout int) error {
 	kv := c.consul.KV()
 	var writeOptions consulapi.WriteOptions
 	c.writeLock.Lock()
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/etcdclient.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/etcdclient.go
index 8db047c..3ae767c 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/etcdclient.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/etcdclient.go
@@ -38,6 +38,9 @@
 	lockToMutexLock  sync.Mutex
 }
 
+// Connection Timeout in Seconds
+var connTimeout int = 2
+
 // NewEtcdClient returns a new client for the Etcd KV store
 func NewEtcdClient(addr string, timeout int) (*EtcdClient, error) {
 	duration := GetDuration(timeout)
@@ -71,7 +74,7 @@
 
 // List returns an array of key-value pairs with key as a prefix.  Timeout defines how long the function will
 // wait for a response
-func (c *EtcdClient) List(key string, timeout int, lock ...bool) (map[string]*KVPair, error) {
+func (c *EtcdClient) List(key string, timeout int) (map[string]*KVPair, error) {
 	duration := GetDuration(timeout)
 
 	ctx, cancel := context.WithTimeout(context.Background(), duration)
@@ -91,7 +94,7 @@
 
 // Get returns a key-value pair for a given key. Timeout defines how long the function will
 // wait for a response
-func (c *EtcdClient) Get(key string, timeout int, lock ...bool) (*KVPair, error) {
+func (c *EtcdClient) Get(key string, timeout int) (*KVPair, error) {
 	duration := GetDuration(timeout)
 
 	ctx, cancel := context.WithTimeout(context.Background(), duration)
@@ -112,7 +115,7 @@
 // Put writes a key-value pair to the KV store.  Value can only be a string or []byte since the etcd API
 // accepts only a string as a value for a put operation. Timeout defines how long the function will
 // wait for a response
-func (c *EtcdClient) Put(key string, value interface{}, timeout int, lock ...bool) error {
+func (c *EtcdClient) Put(key string, value interface{}, timeout int) error {
 
 	// Validate that we can convert value to a string as etcd API expects a string
 	var val string
@@ -155,7 +158,7 @@
 
 // Delete removes a key from the KV store. Timeout defines how long the function will
 // wait for a response
-func (c *EtcdClient) Delete(key string, timeout int, lock ...bool) error {
+func (c *EtcdClient) Delete(key string, timeout int) error {
 
 	duration := GetDuration(timeout)
 
@@ -188,8 +191,13 @@
 		return nil, fmt.Errorf("unexpected-type%T", value)
 	}
 
+	duration := GetDuration(connTimeout)
+
 	// Create a lease
-	resp, err := c.ectdAPI.Grant(context.Background(), ttl)
+	ctx, cancel := context.WithTimeout(context.Background(), duration)
+	defer cancel()
+
+	resp, err := c.ectdAPI.Grant(ctx, ttl)
 	if err != nil {
 		log.Error(err)
 		return nil, err
@@ -233,7 +241,7 @@
 		}
 	} else {
 		// Read the Key to ensure this is our Key
-		m, err := c.Get(key, defaultKVGetTimeout, false)
+		m, err := c.Get(key, defaultKVGetTimeout)
 		if err != nil {
 			return nil, err
 		}
@@ -255,8 +263,12 @@
 func (c *EtcdClient) ReleaseAllReservations() error {
 	c.writeLock.Lock()
 	defer c.writeLock.Unlock()
+	duration := GetDuration(connTimeout)
+	ctx, cancel := context.WithTimeout(context.Background(), duration)
+	defer cancel()
+
 	for key, leaseID := range c.keyReservations {
-		_, err := c.ectdAPI.Revoke(context.Background(), *leaseID)
+		_, err := c.ectdAPI.Revoke(ctx, *leaseID)
 		if err != nil {
 			log.Errorw("cannot-release-reservation", log.Fields{"key": key, "error": err})
 			return err
@@ -277,8 +289,12 @@
 	if leaseID, ok = c.keyReservations[key]; !ok {
 		return nil
 	}
+	duration := GetDuration(connTimeout)
+	ctx, cancel := context.WithTimeout(context.Background(), duration)
+	defer cancel()
+
 	if leaseID != nil {
-		_, err := c.ectdAPI.Revoke(context.Background(), *leaseID)
+		_, err := c.ectdAPI.Revoke(ctx, *leaseID)
 		if err != nil {
 			log.Error(err)
 			return err
@@ -299,9 +315,12 @@
 	if leaseID, ok = c.keyReservations[key]; !ok {
 		return errors.New("key-not-reserved")
 	}
+	duration := GetDuration(connTimeout)
+	ctx, cancel := context.WithTimeout(context.Background(), duration)
+	defer cancel()
 
 	if leaseID != nil {
-		_, err := c.ectdAPI.KeepAliveOnce(context.Background(), *leaseID)
+		_, err := c.ectdAPI.KeepAliveOnce(ctx, *leaseID)
 		if err != nil {
 			log.Errorw("lease-may-have-expired", log.Fields{"error": err})
 			return err
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/client.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/client.go
index 007aa74..488bf9f 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/client.go
@@ -55,6 +55,7 @@
 	DefaultNumberReplicas           = 1
 	DefaultAutoCreateTopic          = false
 	DefaultMetadataMaxRetry         = 3
+	DefaultLivenessChannelInterval  = time.Second * 30
 )
 
 // MsgClient represents the set of APIs  a Kafka MsgClient must implement
@@ -66,4 +67,6 @@
 	Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ca.InterContainerMessage, error)
 	UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error
 	Send(msg interface{}, topic *Topic, keys ...string) error
+	SendLiveness() error
+	EnableLivenessChannel(enable bool) chan bool
 }
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/kafka_inter_container_library.go
index c576bc6..3326191 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/kafka_inter_container_library.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/kafka_inter_container_library.go
@@ -764,6 +764,14 @@
 	return nil
 }
 
+func (kp *InterContainerProxy) EnableLivenessChannel(enable bool) chan bool {
+	return kp.kafkaClient.EnableLivenessChannel(enable)
+}
+
+func (kp *InterContainerProxy) SendLiveness() error {
+	return kp.kafkaClient.SendLiveness()
+}
+
 //formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
 //or an error on failure
 func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go
index fc75026..a251c56 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go
@@ -74,6 +74,11 @@
 	topicLockMap                  map[string]*sync.RWMutex
 	lockOfTopicLockMap            sync.RWMutex
 	metadataMaxRetry              int
+	alive                         bool
+	liveness                      chan bool
+	livenessChannelInterval       time.Duration
+	lastLivenessTime              time.Time
+	started                       bool
 }
 
 type SaramaClientOption func(*SaramaClient)
@@ -186,6 +191,12 @@
 	}
 }
 
+func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.livenessChannelInterval = opt
+	}
+}
+
 func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
 	client := &SaramaClient{
 		KafkaHost: DefaultKafkaHost,
@@ -205,6 +216,7 @@
 	client.numReplicas = DefaultNumberReplicas
 	client.autoCreateTopic = DefaultAutoCreateTopic
 	client.metadataMaxRetry = DefaultMetadataMaxRetry
+	client.livenessChannelInterval = DefaultLivenessChannelInterval
 
 	for _, option := range opts {
 		option(client)
@@ -216,6 +228,10 @@
 	client.topicLockMap = make(map[string]*sync.RWMutex)
 	client.lockOfTopicLockMap = sync.RWMutex{}
 	client.lockOfGroupConsumers = sync.RWMutex{}
+
+	// alive until proven otherwise
+	client.alive = true
+
 	return client
 }
 
@@ -259,12 +275,16 @@
 
 	log.Info("kafka-sarama-client-started")
 
+	sc.started = true
+
 	return nil
 }
 
 func (sc *SaramaClient) Stop() {
 	log.Info("stopping-sarama-client")
 
+	sc.started = false
+
 	//Send a message over the done channel to close all long running routines
 	sc.doneCh <- 1
 
@@ -438,6 +458,30 @@
 	return err
 }
 
+func (sc *SaramaClient) updateLiveness(alive bool) {
+	// Post a consistent stream of liveness data to the channel,
+	// so that in a live state, the core does not timeout and
+	// send a forced liveness message. Production of liveness
+	// events to the channel is rate-limited by livenessChannelInterval.
+	if sc.liveness != nil {
+		if sc.alive != alive {
+			log.Info("update-liveness-channel-because-change")
+			sc.liveness <- alive
+			sc.lastLivenessTime = time.Now()
+		} else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
+			log.Info("update-liveness-channel-because-interval")
+			sc.liveness <- alive
+			sc.lastLivenessTime = time.Now()
+		}
+	}
+
+	// Only emit a log message when the state changes
+	if sc.alive != alive {
+		log.Info("set-client-alive", log.Fields{"alive": alive})
+		sc.alive = alive
+	}
+}
+
 // send formats and sends the request onto the kafka messaging bus.
 func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
 
@@ -474,8 +518,68 @@
 	select {
 	case ok := <-sc.producer.Successes():
 		log.Debugw("message-sent", log.Fields{"status": ok.Topic})
+		sc.updateLiveness(true)
 	case notOk := <-sc.producer.Errors():
 		log.Debugw("error-sending", log.Fields{"status": notOk})
+		if strings.Contains(notOk.Error(), "Failed to produce") {
+			sc.updateLiveness(false)
+		}
+		return notOk
+	}
+	return nil
+}
+
+// Enable the liveness monitor channel. This channel will report
+// a "true" or "false" on every publish, which indicates whether
+// or not the channel is still live. This channel is then picked up
+// by the service (i.e. rw_core / ro_core) to update readiness status
+// and/or take other actions.
+func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
+	log.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
+	if enable {
+		if sc.liveness == nil {
+			log.Info("kafka-create-liveness-channel")
+			// At least 1, so we can immediately post to it without blocking
+			// Setting a bigger number (10) allows the monitor to fall behind
+			// without blocking others. The monitor shouldn't really fall
+			// behind...
+			sc.liveness = make(chan bool, 10)
+			// post intial state to the channel
+			sc.liveness <- sc.alive
+		}
+	} else {
+		// TODO: Think about whether we need the ability to turn off
+		// liveness monitoring
+		panic("Turning off liveness reporting is not supported")
+	}
+	return sc.liveness
+}
+
+// send an empty message on the liveness channel to check whether connectivity has
+// been restored.
+func (sc *SaramaClient) SendLiveness() error {
+	if !sc.started {
+		return fmt.Errorf("SendLiveness() called while not started")
+	}
+
+	kafkaMsg := &sarama.ProducerMessage{
+		Topic: "_liveness_test",
+		Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
+	}
+
+	// Send message to kafka
+	sc.producer.Input() <- kafkaMsg
+	// Wait for result
+	// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
+	select {
+	case ok := <-sc.producer.Successes():
+		log.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
+		sc.updateLiveness(true)
+	case notOk := <-sc.producer.Errors():
+		log.Debugw("liveness-error-sending", log.Fields{"status": notOk})
+		if strings.Contains(notOk.Error(), "Failed to produce") {
+			sc.updateLiveness(false)
+		}
 		return notOk
 	}
 	return nil
@@ -713,7 +817,8 @@
 	config := scc.NewConfig()
 	config.ClientID = uuid.New().String()
 	config.Group.Mode = scc.ConsumerModeMultiplex
-	//config.Consumer.Return.Errors = true
+	config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
+	config.Consumer.Return.Errors = true
 	//config.Group.Return.Notifications = false
 	//config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
 	//config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
@@ -791,16 +896,20 @@
 		select {
 		case err, ok := <-consumer.Errors():
 			if ok {
+				sc.updateLiveness(false)
 				log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
 			} else {
+				log.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
 				// channel is closed
 				break startloop
 			}
 		case msg, ok := <-consumer.Messages():
 			if !ok {
+				log.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
 				// Channel closed
 				break startloop
 			}
+			sc.updateLiveness(true)
 			log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
 			msgBody := msg.Value
 			icm := &ic.InterContainerMessage{}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/probe/probe.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/probe/probe.go
index 9823566..7e6dbf9 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/probe/probe.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/probe/probe.go
@@ -47,6 +47,9 @@
 
 	// ServiceStatusFailed service has stopped because of an error
 	ServiceStatusFailed
+
+	// ServiceStatusNotReady service has started but is unable to accept requests
+	ServiceStatusNotReady
 )
 
 const (
@@ -71,6 +74,8 @@
 		return "Stopped"
 	case ServiceStatusFailed:
 		return "Failed"
+	case ServiceStatusNotReady:
+		return "NotReady"
 	}
 }
 
@@ -137,6 +142,13 @@
 	if p.status == nil {
 		p.status = make(map[string]ServiceStatus)
 	}
+
+	// if status hasn't changed, avoid doing useless work
+	existingStatus, ok := p.status[name]
+	if ok && (existingStatus == status) {
+		return
+	}
+
 	p.status[name] = status
 	if p.readyFunc != nil {
 		p.isReady = p.readyFunc(p.status)
@@ -158,17 +170,41 @@
 		})
 }
 
+func (p *Probe) GetStatus(name string) ServiceStatus {
+	p.mutex.Lock()
+	defer p.mutex.Unlock()
+
+	if p.status == nil {
+		p.status = make(map[string]ServiceStatus)
+	}
+
+	currentStatus, ok := p.status[name]
+	if ok {
+		return currentStatus
+	}
+
+	return ServiceStatusUnknown
+}
+
+func GetProbeFromContext(ctx context.Context) *Probe {
+	if ctx != nil {
+		if value := ctx.Value(ProbeContextKey); value != nil {
+			if p, ok := value.(*Probe); ok {
+				return p
+			}
+		}
+	}
+	return nil
+}
+
 // UpdateStatusFromContext a convenience function to pull the Probe reference from the
 // Context, if it exists, and then calling UpdateStatus on that Probe reference. If Context
 // is nil or if a Probe reference is not associated with the ProbeContextKey then nothing
 // happens
 func UpdateStatusFromContext(ctx context.Context, name string, status ServiceStatus) {
-	if ctx != nil {
-		if value := ctx.Value(ProbeContextKey); value != nil {
-			if p, ok := value.(*Probe); ok {
-				p.UpdateStatus(name, status)
-			}
-		}
+	p := GetProbeFromContext(ctx)
+	if p != nil {
+		p.UpdateStatus(name, status)
 	}
 }
 
@@ -225,6 +261,10 @@
 	log.Fatal(s.ListenAndServe())
 }
 
+func (p *Probe) IsReady() bool {
+	return p.isReady
+}
+
 // defaultReadyFunc if all services are running then ready, else not
 func defaultReadyFunc(services map[string]ServiceStatus) bool {
 	if len(services) == 0 {
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 9ed318b..bfb23d9 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -55,7 +55,7 @@
 github.com/mitchellh/go-homedir
 # github.com/mitchellh/mapstructure v1.1.2
 github.com/mitchellh/mapstructure
-# github.com/opencord/voltha-lib-go/v2 v2.2.9
+# github.com/opencord/voltha-lib-go/v2 v2.2.16
 github.com/opencord/voltha-lib-go/v2/pkg/adapters
 github.com/opencord/voltha-lib-go/v2/pkg/adapters/common
 github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore