VOL-2631 Cleanup Kafka Interadapter Proxy on shutdown;
Exit gracefully rather than Fatal() on kafka shutdown
Change-Id: Ic4506a8f4e5127d10a0db130a81584c0bf8c8428
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
index 96829c5..04fe35d 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
@@ -247,14 +247,14 @@
}
// CreateWatch starts watching events for the specified key
-func (b *Backend) CreateWatch(ctx context.Context, key string) chan *kvstore.Event {
+func (b *Backend) CreateWatch(ctx context.Context, key string, withPrefix bool) chan *kvstore.Event {
b.Lock()
defer b.Unlock()
formattedPath := b.makePath(key)
logger.Debugw("creating-key-watch", log.Fields{"key": key, "path": formattedPath})
- return b.Client.Watch(ctx, formattedPath)
+ return b.Client.Watch(ctx, formattedPath, withPrefix)
}
// DeleteWatch stops watching events for the specified key
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go
index d30e049..b9cb1ee 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go
@@ -81,7 +81,7 @@
ReleaseReservation(ctx context.Context, key string) error
ReleaseAllReservations(ctx context.Context) error
RenewReservation(ctx context.Context, key string) error
- Watch(ctx context.Context, key string) chan *Event
+ Watch(ctx context.Context, key string, withPrefix bool) chan *Event
AcquireLock(ctx context.Context, lockName string, timeout int) error
ReleaseLock(lockName string) error
IsConnectionUp(ctx context.Context) bool // timeout in second
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go
index fdf39be..bdf2d10 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go
@@ -360,7 +360,7 @@
// Watch provides the watch capability on a given key. It returns a channel onto which the callee needs to
// listen to receive Events.
-func (c *ConsulClient) Watch(ctx context.Context, key string) chan *Event {
+func (c *ConsulClient) Watch(ctx context.Context, key string, withPrefix bool) chan *Event {
// Create a new channel
ch := make(chan *Event, maxClientChannelBufferSize)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
index 2d126f7..1014ada 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
@@ -298,10 +298,15 @@
// Watch provides the watch capability on a given key. It returns a channel onto which the callee needs to
// listen to receive Events.
-func (c *EtcdClient) Watch(ctx context.Context, key string) chan *Event {
+func (c *EtcdClient) Watch(ctx context.Context, key string, withPrefix bool) chan *Event {
w := v3Client.NewWatcher(c.ectdAPI)
ctx, cancel := context.WithCancel(ctx)
- channel := w.Watch(ctx, key)
+ var channel v3Client.WatchChan
+ if withPrefix {
+ channel = w.Watch(ctx, key, v3Client.WithPrefix())
+ } else {
+ channel = w.Watch(ctx, key)
+ }
// Create a new channel
ch := make(chan *Event, maxClientChannelBufferSize)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
index aa77ffb..91b2143 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
@@ -189,9 +189,15 @@
kp.doneOnce.Do(func() { close(kp.doneCh) })
// TODO : Perform cleanup
kp.kafkaClient.Stop()
- //kp.deleteAllTopicRequestHandlerChannelMap()
- //kp.deleteAllTopicResponseChannelMap()
- //kp.deleteAllTransactionIdToChannelMap()
+ err := kp.deleteAllTopicRequestHandlerChannelMap()
+ if err != nil {
+ log.Errorw("failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
+ }
+ err = kp.deleteAllTopicResponseChannelMap()
+ if err != nil {
+ log.Errorw("failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
+ }
+ kp.deleteAllTransactionIdToChannelMap()
}
func (kp *interContainerProxy) GetDefaultTopic() *Topic {
@@ -409,17 +415,25 @@
// nolint: unused
func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
+ logger.Debug("delete-all-topic-response-channel")
kp.lockTopicResponseChannelMap.Lock()
defer kp.lockTopicResponseChannelMap.Unlock()
- var err error
+ var unsubscribeFailTopics []string
for topic := range kp.topicToResponseChannelMap {
// Unsubscribe to this topic first - this will close the subscribed channel
- if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+ if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+ unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+ // Do not return. Continue to try to unsubscribe to other topics.
+ } else {
+ // Only delete from channel map if successfully unsubscribed.
+ delete(kp.topicToResponseChannelMap, topic)
}
- delete(kp.topicToResponseChannelMap, topic)
}
- return err
+ if len(unsubscribeFailTopics) > 0 {
+ return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+ }
+ return nil
}
func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
@@ -447,17 +461,25 @@
// nolint: unused
func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
+ logger.Debug("delete-all-topic-request-channel")
kp.lockTopicRequestHandlerChannelMap.Lock()
defer kp.lockTopicRequestHandlerChannelMap.Unlock()
- var err error
+ var unsubscribeFailTopics []string
for topic := range kp.topicToRequestHandlerChannelMap {
// Close the kafka client client first by unsubscribing to this topic
- if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+ if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+ unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+ // Do not return. Continue to try to unsubscribe to other topics.
+ } else {
+ // Only delete from channel map if successfully unsubscribed.
+ delete(kp.topicToRequestHandlerChannelMap, topic)
}
- delete(kp.topicToRequestHandlerChannelMap, topic)
}
- return err
+ if len(unsubscribeFailTopics) > 0 {
+ return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+ }
+ return nil
}
func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
@@ -491,6 +513,7 @@
// nolint: unused
func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
+ logger.Debug("delete-all-transaction-id-channel-map")
kp.lockTransactionIdToChannelMap.Lock()
defer kp.lockTransactionIdToChannelMap.Unlock()
for key, value := range kp.transactionIdToChannelMap {
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
index 69e22a4..3ebdd3a 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
@@ -193,6 +193,22 @@
return 0, errors.New("Given LogLevel is invalid : " + l)
}
+func LogLevelToString(l LogLevel) (string, error) {
+ switch l {
+ case DebugLevel:
+ return "DEBUG", nil
+ case InfoLevel:
+ return "INFO", nil
+ case WarnLevel:
+ return "WARN", nil
+ case ErrorLevel:
+ return "ERROR", nil
+ case FatalLevel:
+ return "FATAL", nil
+ }
+ return "", errors.New("Given LogLevel is invalid " + string(l))
+}
+
func getDefaultConfig(outputType string, level LogLevel, defaultFields Fields) zp.Config {
return zp.Config{
Level: logLevelToAtomicLevel(level),
diff --git a/vendor/modules.txt b/vendor/modules.txt
index d52f5da..fcf7bc9 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -63,7 +63,7 @@
github.com/mitchellh/go-homedir
# github.com/mitchellh/mapstructure v1.1.2
github.com/mitchellh/mapstructure
-# github.com/opencord/voltha-lib-go/v3 v3.0.12
+# github.com/opencord/voltha-lib-go/v3 v3.0.14
github.com/opencord/voltha-lib-go/v3/pkg/adapters
github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif
github.com/opencord/voltha-lib-go/v3/pkg/adapters/common