VOL-3383 - add mutex around shared data
Change-Id: I676615beaa63561c36e06484b9b0261f1c6fb6f2
diff --git a/VERSION b/VERSION
index 5ae69bd..db5486c 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.2.5
+3.2.6-dev
diff --git a/pkg/db/backend.go b/pkg/db/backend.go
index f595dc1..60afe72 100644
--- a/pkg/db/backend.go
+++ b/pkg/db/backend.go
@@ -20,6 +20,7 @@
"context"
"errors"
"fmt"
+ "sync"
"time"
"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
@@ -40,7 +41,8 @@
Timeout time.Duration
Address string
PathPrefix string
- alive bool // Is this backend connection alive?
+ alive bool // Is this backend connection alive?
+ livenessMutex sync.Mutex
liveness chan bool // channel to post alive state
LivenessChannelInterval time.Duration // regularly push alive state beyond this interval
lastLivenessTime time.Time // Instant of last alive state push
@@ -91,8 +93,9 @@
// so that in a live state, the core does not timeout and
// send a forced liveness message. Push alive state if the
// last push to channel was beyond livenessChannelInterval
+ b.livenessMutex.Lock()
+ defer b.livenessMutex.Unlock()
if b.liveness != nil {
-
if b.alive != alive {
logger.Debug(ctx, "update-liveness-channel-reason-change")
b.liveness <- alive
@@ -128,14 +131,10 @@
// and/or take other actions.
func (b *Backend) EnableLivenessChannel(ctx context.Context) chan bool {
logger.Debug(ctx, "enable-kvstore-liveness-channel")
-
+ b.livenessMutex.Lock()
+ defer b.livenessMutex.Unlock()
if b.liveness == nil {
- logger.Debug(ctx, "create-kvstore-liveness-channel")
-
- // Channel size of 10 to avoid any possibility of blocking in Load conditions
b.liveness = make(chan bool, 10)
-
- // Post initial alive state
b.liveness <- b.alive
b.lastLivenessTime = time.Now()
}
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index 87c7ce4..69450fa 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -75,10 +75,12 @@
lockOfTopicLockMap sync.RWMutex
metadataMaxRetry int
alive bool
+ livenessMutex sync.Mutex
liveness chan bool
livenessChannelInterval time.Duration
lastLivenessTime time.Time
started bool
+ healthinessMutex sync.Mutex
healthy bool
healthiness chan bool
}
@@ -463,6 +465,8 @@
// so that in a live state, the core does not timeout and
// send a forced liveness message. Production of liveness
// events to the channel is rate-limited by livenessChannelInterval.
+ sc.livenessMutex.Lock()
+ defer sc.livenessMutex.Unlock()
if sc.liveness != nil {
if sc.alive != alive {
logger.Info(ctx, "update-liveness-channel-because-change")
@@ -485,6 +489,8 @@
// Once unhealthy, we never go back
func (sc *SaramaClient) setUnhealthy(ctx context.Context) {
sc.healthy = false
+ sc.healthinessMutex.Lock()
+ defer sc.healthinessMutex.Unlock()
if sc.healthiness != nil {
logger.Infow(ctx, "set-client-unhealthy", log.Fields{"healthy": sc.healthy})
sc.healthiness <- sc.healthy
@@ -594,6 +600,8 @@
func (sc *SaramaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
logger.Infow(ctx, "kafka-enable-liveness-channel", log.Fields{"enable": enable})
if enable {
+ sc.livenessMutex.Lock()
+ defer sc.livenessMutex.Unlock()
if sc.liveness == nil {
logger.Info(ctx, "kafka-create-liveness-channel")
// At least 1, so we can immediately post to it without blocking
@@ -618,6 +626,8 @@
func (sc *SaramaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
logger.Infow(ctx, "kafka-enable-healthiness-channel", log.Fields{"enable": enable})
if enable {
+ sc.healthinessMutex.Lock()
+ defer sc.healthinessMutex.Unlock()
if sc.healthiness == nil {
logger.Info(ctx, "kafka-create-healthiness-channel")
// At least 1, so we can immediately post to it without blocking