VOL-2631 Cleanup Kafka Interadapter Proxy on shutdown;
Exit gracefully rather than Fatal() on kafka shutdown
Change-Id: Ic4506a8f4e5127d10a0db130a81584c0bf8c8428
diff --git a/VERSION b/VERSION
index bc4abe8..b29ec1d 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.3.8
+2.3.9-dev
diff --git a/adaptercore/resourcemanager/resourcemanager_test.go b/adaptercore/resourcemanager/resourcemanager_test.go
index 6fd3438..5d96cc3 100644
--- a/adaptercore/resourcemanager/resourcemanager_test.go
+++ b/adaptercore/resourcemanager/resourcemanager_test.go
@@ -218,7 +218,7 @@
}
// Watch mock function implementation for KVClient
-func (kvclient *MockResKVClient) Watch(ctx context.Context, key string) chan *kvstore.Event {
+func (kvclient *MockResKVClient) Watch(ctx context.Context, key string, withPrefix bool) chan *kvstore.Event {
return nil
}
diff --git a/go.mod b/go.mod
index b4f685f..1719306 100644
--- a/go.mod
+++ b/go.mod
@@ -7,7 +7,7 @@
github.com/cenkalti/backoff/v3 v3.1.1
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.3.2
- github.com/opencord/voltha-lib-go/v3 v3.0.12
+ github.com/opencord/voltha-lib-go/v3 v3.0.14
github.com/opencord/voltha-protos/v3 v3.2.4
go.etcd.io/etcd v0.0.0-20190930204107-236ac2a90522
google.golang.org/grpc v1.25.1
diff --git a/go.sum b/go.sum
index a40d1db..d959ca8 100644
--- a/go.sum
+++ b/go.sum
@@ -196,8 +196,8 @@
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/opencord/voltha-lib-go/v3 v3.0.12 h1:YGpyjl0UxdVUpKuo/VO5eCi+1+5EbMK9ZUqjDvWUQWQ=
-github.com/opencord/voltha-lib-go/v3 v3.0.12/go.mod h1:69Y+rVd25Nq2SUeoY7Q1BXtwrcUPllG0erhq+aK8Qec=
+github.com/opencord/voltha-lib-go/v3 v3.0.14 h1:klI6Qt5iA9bTqI42jflSbU6YyEMJRcpaqh6rRw7qNnI=
+github.com/opencord/voltha-lib-go/v3 v3.0.14/go.mod h1:69Y+rVd25Nq2SUeoY7Q1BXtwrcUPllG0erhq+aK8Qec=
github.com/opencord/voltha-protos/v3 v3.2.3 h1:Wv73mw1Ye0bCfyhOk5svgrlE2tLizHq6tQluoDq9Vg8=
github.com/opencord/voltha-protos/v3 v3.2.3/go.mod h1:RIGHt7b80BHpHh3ceodknh0DxUjUHCWSbYbZqRx7Og0=
github.com/opencord/voltha-protos/v3 v3.2.4 h1:BTPpfwJslAY9UintQ9/f+wLD+StZ5QnPv6j7pbAk+J8=
diff --git a/main.go b/main.go
index 536e917..05479c9 100644
--- a/main.go
+++ b/main.go
@@ -202,17 +202,28 @@
livelinessChannel := a.kafkaClient.EnableLivenessChannel(true)
healthinessChannel := a.kafkaClient.EnableHealthinessChannel(true)
timeout := a.config.LiveProbeInterval
+ failed := false
for {
timeoutTimer := time.NewTimer(timeout)
select {
case healthiness := <-healthinessChannel:
if !healthiness {
- // log.Fatal will call os.Exit(1) to terminate
- log.Fatal("Kafka service has become unhealthy")
+ // This will eventually cause K8s to restart the container, and will do
+ // so in a way that allows cleanup to continue, rather than an immediate
+ // panic and exit here.
+ probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusFailed)
+ failed = true
+ }
+ // Check if the timer has expired or not
+ if !timeoutTimer.Stop() {
+ <-timeoutTimer.C
}
case liveliness := <-livelinessChannel:
- if !liveliness {
+ if failed {
+ // Failures of the message bus are permanent and can't ever be recovered from,
+ // so make sure we never inadvertently reset a failed state back to unready.
+ } else if !liveliness {
// kafka not reachable or down, updating the status to not ready state
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
timeout = a.config.NotLiveProbeInterval
@@ -255,6 +266,10 @@
a.kvClient.Close()
}
+ if a.kip != nil {
+ a.kip.Stop()
+ }
+
// TODO: More cleanup
}
diff --git a/mocks/mockKVClient.go b/mocks/mockKVClient.go
index 456d196..2343994 100644
--- a/mocks/mockKVClient.go
+++ b/mocks/mockKVClient.go
@@ -222,7 +222,7 @@
}
// Watch mock function implementation for KVClient
-func (kvclient *MockKVClient) Watch(ctx context.Context, key string) chan *kvstore.Event {
+func (kvclient *MockKVClient) Watch(ctx context.Context, key string, withPrefix bool) chan *kvstore.Event {
return nil
// if key == "" {
// return nil
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
index 96829c5..04fe35d 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
@@ -247,14 +247,14 @@
}
// CreateWatch starts watching events for the specified key
-func (b *Backend) CreateWatch(ctx context.Context, key string) chan *kvstore.Event {
+func (b *Backend) CreateWatch(ctx context.Context, key string, withPrefix bool) chan *kvstore.Event {
b.Lock()
defer b.Unlock()
formattedPath := b.makePath(key)
logger.Debugw("creating-key-watch", log.Fields{"key": key, "path": formattedPath})
- return b.Client.Watch(ctx, formattedPath)
+ return b.Client.Watch(ctx, formattedPath, withPrefix)
}
// DeleteWatch stops watching events for the specified key
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go
index d30e049..b9cb1ee 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go
@@ -81,7 +81,7 @@
ReleaseReservation(ctx context.Context, key string) error
ReleaseAllReservations(ctx context.Context) error
RenewReservation(ctx context.Context, key string) error
- Watch(ctx context.Context, key string) chan *Event
+ Watch(ctx context.Context, key string, withPrefix bool) chan *Event
AcquireLock(ctx context.Context, lockName string, timeout int) error
ReleaseLock(lockName string) error
IsConnectionUp(ctx context.Context) bool // timeout in second
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go
index fdf39be..bdf2d10 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go
@@ -360,7 +360,7 @@
// Watch provides the watch capability on a given key. It returns a channel onto which the callee needs to
// listen to receive Events.
-func (c *ConsulClient) Watch(ctx context.Context, key string) chan *Event {
+func (c *ConsulClient) Watch(ctx context.Context, key string, withPrefix bool) chan *Event {
// Create a new channel
ch := make(chan *Event, maxClientChannelBufferSize)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
index 2d126f7..1014ada 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
@@ -298,10 +298,15 @@
// Watch provides the watch capability on a given key. It returns a channel onto which the callee needs to
// listen to receive Events.
-func (c *EtcdClient) Watch(ctx context.Context, key string) chan *Event {
+func (c *EtcdClient) Watch(ctx context.Context, key string, withPrefix bool) chan *Event {
w := v3Client.NewWatcher(c.ectdAPI)
ctx, cancel := context.WithCancel(ctx)
- channel := w.Watch(ctx, key)
+ var channel v3Client.WatchChan
+ if withPrefix {
+ channel = w.Watch(ctx, key, v3Client.WithPrefix())
+ } else {
+ channel = w.Watch(ctx, key)
+ }
// Create a new channel
ch := make(chan *Event, maxClientChannelBufferSize)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
index aa77ffb..91b2143 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
@@ -189,9 +189,15 @@
kp.doneOnce.Do(func() { close(kp.doneCh) })
// TODO : Perform cleanup
kp.kafkaClient.Stop()
- //kp.deleteAllTopicRequestHandlerChannelMap()
- //kp.deleteAllTopicResponseChannelMap()
- //kp.deleteAllTransactionIdToChannelMap()
+ err := kp.deleteAllTopicRequestHandlerChannelMap()
+ if err != nil {
+ log.Errorw("failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
+ }
+ err = kp.deleteAllTopicResponseChannelMap()
+ if err != nil {
+ log.Errorw("failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
+ }
+ kp.deleteAllTransactionIdToChannelMap()
}
func (kp *interContainerProxy) GetDefaultTopic() *Topic {
@@ -409,17 +415,25 @@
// nolint: unused
func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
+ logger.Debug("delete-all-topic-response-channel")
kp.lockTopicResponseChannelMap.Lock()
defer kp.lockTopicResponseChannelMap.Unlock()
- var err error
+ var unsubscribeFailTopics []string
for topic := range kp.topicToResponseChannelMap {
// Unsubscribe to this topic first - this will close the subscribed channel
- if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+ if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+ unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+ // Do not return. Continue to try to unsubscribe to other topics.
+ } else {
+ // Only delete from channel map if successfully unsubscribed.
+ delete(kp.topicToResponseChannelMap, topic)
}
- delete(kp.topicToResponseChannelMap, topic)
}
- return err
+ if len(unsubscribeFailTopics) > 0 {
+ return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+ }
+ return nil
}
func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
@@ -447,17 +461,25 @@
// nolint: unused
func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
+ logger.Debug("delete-all-topic-request-channel")
kp.lockTopicRequestHandlerChannelMap.Lock()
defer kp.lockTopicRequestHandlerChannelMap.Unlock()
- var err error
+ var unsubscribeFailTopics []string
for topic := range kp.topicToRequestHandlerChannelMap {
// Close the kafka client client first by unsubscribing to this topic
- if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+ if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+ unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+ // Do not return. Continue to try to unsubscribe to other topics.
+ } else {
+ // Only delete from channel map if successfully unsubscribed.
+ delete(kp.topicToRequestHandlerChannelMap, topic)
}
- delete(kp.topicToRequestHandlerChannelMap, topic)
}
- return err
+ if len(unsubscribeFailTopics) > 0 {
+ return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+ }
+ return nil
}
func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
@@ -491,6 +513,7 @@
// nolint: unused
func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
+ logger.Debug("delete-all-transaction-id-channel-map")
kp.lockTransactionIdToChannelMap.Lock()
defer kp.lockTransactionIdToChannelMap.Unlock()
for key, value := range kp.transactionIdToChannelMap {
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
index 69e22a4..3ebdd3a 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
@@ -193,6 +193,22 @@
return 0, errors.New("Given LogLevel is invalid : " + l)
}
+func LogLevelToString(l LogLevel) (string, error) {
+ switch l {
+ case DebugLevel:
+ return "DEBUG", nil
+ case InfoLevel:
+ return "INFO", nil
+ case WarnLevel:
+ return "WARN", nil
+ case ErrorLevel:
+ return "ERROR", nil
+ case FatalLevel:
+ return "FATAL", nil
+ }
+ return "", errors.New("Given LogLevel is invalid " + string(l))
+}
+
func getDefaultConfig(outputType string, level LogLevel, defaultFields Fields) zp.Config {
return zp.Config{
Level: logLevelToAtomicLevel(level),
diff --git a/vendor/modules.txt b/vendor/modules.txt
index d52f5da..fcf7bc9 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -63,7 +63,7 @@
github.com/mitchellh/go-homedir
# github.com/mitchellh/mapstructure v1.1.2
github.com/mitchellh/mapstructure
-# github.com/opencord/voltha-lib-go/v3 v3.0.12
+# github.com/opencord/voltha-lib-go/v3 v3.0.14
github.com/opencord/voltha-lib-go/v3/pkg/adapters
github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif
github.com/opencord/voltha-lib-go/v3/pkg/adapters/common