VOL-2631 Cleanup Kafka Interadapter Proxy on shutdown;
Exit gracefully rather than Fatal() on kafka shutdown

Change-Id: Ic4506a8f4e5127d10a0db130a81584c0bf8c8428
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
index 96829c5..04fe35d 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
@@ -247,14 +247,14 @@
 }
 
 // CreateWatch starts watching events for the specified key
-func (b *Backend) CreateWatch(ctx context.Context, key string) chan *kvstore.Event {
+func (b *Backend) CreateWatch(ctx context.Context, key string, withPrefix bool) chan *kvstore.Event {
 	b.Lock()
 	defer b.Unlock()
 
 	formattedPath := b.makePath(key)
 	logger.Debugw("creating-key-watch", log.Fields{"key": key, "path": formattedPath})
 
-	return b.Client.Watch(ctx, formattedPath)
+	return b.Client.Watch(ctx, formattedPath, withPrefix)
 }
 
 // DeleteWatch stops watching events for the specified key
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go
index d30e049..b9cb1ee 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go
@@ -81,7 +81,7 @@
 	ReleaseReservation(ctx context.Context, key string) error
 	ReleaseAllReservations(ctx context.Context) error
 	RenewReservation(ctx context.Context, key string) error
-	Watch(ctx context.Context, key string) chan *Event
+	Watch(ctx context.Context, key string, withPrefix bool) chan *Event
 	AcquireLock(ctx context.Context, lockName string, timeout int) error
 	ReleaseLock(lockName string) error
 	IsConnectionUp(ctx context.Context) bool // timeout in second
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go
index fdf39be..bdf2d10 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go
@@ -360,7 +360,7 @@
 
 // Watch provides the watch capability on a given key.  It returns a channel onto which the callee needs to
 // listen to receive Events.
-func (c *ConsulClient) Watch(ctx context.Context, key string) chan *Event {
+func (c *ConsulClient) Watch(ctx context.Context, key string, withPrefix bool) chan *Event {
 
 	// Create a new channel
 	ch := make(chan *Event, maxClientChannelBufferSize)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
index 2d126f7..1014ada 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
@@ -298,10 +298,15 @@
 
 // Watch provides the watch capability on a given key.  It returns a channel onto which the callee needs to
 // listen to receive Events.
-func (c *EtcdClient) Watch(ctx context.Context, key string) chan *Event {
+func (c *EtcdClient) Watch(ctx context.Context, key string, withPrefix bool) chan *Event {
 	w := v3Client.NewWatcher(c.ectdAPI)
 	ctx, cancel := context.WithCancel(ctx)
-	channel := w.Watch(ctx, key)
+	var channel v3Client.WatchChan
+	if withPrefix {
+		channel = w.Watch(ctx, key, v3Client.WithPrefix())
+	} else {
+		channel = w.Watch(ctx, key)
+	}
 
 	// Create a new channel
 	ch := make(chan *Event, maxClientChannelBufferSize)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
index aa77ffb..91b2143 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
@@ -189,9 +189,15 @@
 	kp.doneOnce.Do(func() { close(kp.doneCh) })
 	// TODO : Perform cleanup
 	kp.kafkaClient.Stop()
-	//kp.deleteAllTopicRequestHandlerChannelMap()
-	//kp.deleteAllTopicResponseChannelMap()
-	//kp.deleteAllTransactionIdToChannelMap()
+	err := kp.deleteAllTopicRequestHandlerChannelMap()
+	if err != nil {
+		log.Errorw("failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
+	}
+	err = kp.deleteAllTopicResponseChannelMap()
+	if err != nil {
+		log.Errorw("failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
+	}
+	kp.deleteAllTransactionIdToChannelMap()
 }
 
 func (kp *interContainerProxy) GetDefaultTopic() *Topic {
@@ -409,17 +415,25 @@
 
 // nolint: unused
 func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
+	logger.Debug("delete-all-topic-response-channel")
 	kp.lockTopicResponseChannelMap.Lock()
 	defer kp.lockTopicResponseChannelMap.Unlock()
-	var err error
+	var unsubscribeFailTopics []string
 	for topic := range kp.topicToResponseChannelMap {
 		// Unsubscribe to this topic first - this will close the subscribed channel
-		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+		if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+			unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
 			logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+			// Do not return. Continue to try to unsubscribe to other topics.
+		} else {
+			// Only delete from channel map if successfully unsubscribed.
+			delete(kp.topicToResponseChannelMap, topic)
 		}
-		delete(kp.topicToResponseChannelMap, topic)
 	}
-	return err
+	if len(unsubscribeFailTopics) > 0 {
+		return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+	}
+	return nil
 }
 
 func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
@@ -447,17 +461,25 @@
 
 // nolint: unused
 func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
+	logger.Debug("delete-all-topic-request-channel")
 	kp.lockTopicRequestHandlerChannelMap.Lock()
 	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
-	var err error
+	var unsubscribeFailTopics []string
 	for topic := range kp.topicToRequestHandlerChannelMap {
 		// Close the kafka client client first by unsubscribing to this topic
-		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+		if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+			unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
 			logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+			// Do not return. Continue to try to unsubscribe to other topics.
+		} else {
+			// Only delete from channel map if successfully unsubscribed.
+			delete(kp.topicToRequestHandlerChannelMap, topic)
 		}
-		delete(kp.topicToRequestHandlerChannelMap, topic)
 	}
-	return err
+	if len(unsubscribeFailTopics) > 0 {
+		return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+	}
+	return nil
 }
 
 func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
@@ -491,6 +513,7 @@
 
 // nolint: unused
 func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
+	logger.Debug("delete-all-transaction-id-channel-map")
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	for key, value := range kp.transactionIdToChannelMap {
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
index 69e22a4..3ebdd3a 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
@@ -193,6 +193,22 @@
 	return 0, errors.New("Given LogLevel is invalid : " + l)
 }
 
+func LogLevelToString(l LogLevel) (string, error) {
+	switch l {
+	case DebugLevel:
+		return "DEBUG", nil
+	case InfoLevel:
+		return "INFO", nil
+	case WarnLevel:
+		return "WARN", nil
+	case ErrorLevel:
+		return "ERROR", nil
+	case FatalLevel:
+		return "FATAL", nil
+	}
+	return "", errors.New("Given LogLevel is invalid " + string(l))
+}
+
 func getDefaultConfig(outputType string, level LogLevel, defaultFields Fields) zp.Config {
 	return zp.Config{
 		Level:            logLevelToAtomicLevel(level),
diff --git a/vendor/modules.txt b/vendor/modules.txt
index d52f5da..fcf7bc9 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -63,7 +63,7 @@
 github.com/mitchellh/go-homedir
 # github.com/mitchellh/mapstructure v1.1.2
 github.com/mitchellh/mapstructure
-# github.com/opencord/voltha-lib-go/v3 v3.0.12
+# github.com/opencord/voltha-lib-go/v3 v3.0.14
 github.com/opencord/voltha-lib-go/v3/pkg/adapters
 github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif
 github.com/opencord/voltha-lib-go/v3/pkg/adapters/common