VOL-3383 - add mutex around shared data

Change-Id: I676615beaa63561c36e06484b9b0261f1c6fb6f2
diff --git a/VERSION b/VERSION
index 5ae69bd..db5486c 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.2.5
+3.2.6-dev
diff --git a/pkg/db/backend.go b/pkg/db/backend.go
index f595dc1..60afe72 100644
--- a/pkg/db/backend.go
+++ b/pkg/db/backend.go
@@ -20,6 +20,7 @@
 	"context"
 	"errors"
 	"fmt"
+	"sync"
 	"time"
 
 	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
@@ -40,7 +41,8 @@
 	Timeout                 time.Duration
 	Address                 string
 	PathPrefix              string
-	alive                   bool          // Is this backend connection alive?
+	alive                   bool // Is this backend connection alive?
+	livenessMutex           sync.Mutex
 	liveness                chan bool     // channel to post alive state
 	LivenessChannelInterval time.Duration // regularly push alive state beyond this interval
 	lastLivenessTime        time.Time     // Instant of last alive state push
@@ -91,8 +93,9 @@
 	// so that in a live state, the core does not timeout and
 	// send a forced liveness message. Push alive state if the
 	// last push to channel was beyond livenessChannelInterval
+	b.livenessMutex.Lock()
+	defer b.livenessMutex.Unlock()
 	if b.liveness != nil {
-
 		if b.alive != alive {
 			logger.Debug(ctx, "update-liveness-channel-reason-change")
 			b.liveness <- alive
@@ -128,14 +131,10 @@
 // and/or take other actions.
 func (b *Backend) EnableLivenessChannel(ctx context.Context) chan bool {
 	logger.Debug(ctx, "enable-kvstore-liveness-channel")
-
+	b.livenessMutex.Lock()
+	defer b.livenessMutex.Unlock()
 	if b.liveness == nil {
-		logger.Debug(ctx, "create-kvstore-liveness-channel")
-
-		// Channel size of 10 to avoid any possibility of blocking in Load conditions
 		b.liveness = make(chan bool, 10)
-
-		// Post initial alive state
 		b.liveness <- b.alive
 		b.lastLivenessTime = time.Now()
 	}
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index 87c7ce4..69450fa 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -75,10 +75,12 @@
 	lockOfTopicLockMap            sync.RWMutex
 	metadataMaxRetry              int
 	alive                         bool
+	livenessMutex                 sync.Mutex
 	liveness                      chan bool
 	livenessChannelInterval       time.Duration
 	lastLivenessTime              time.Time
 	started                       bool
+	healthinessMutex              sync.Mutex
 	healthy                       bool
 	healthiness                   chan bool
 }
@@ -463,6 +465,8 @@
 	// so that in a live state, the core does not timeout and
 	// send a forced liveness message. Production of liveness
 	// events to the channel is rate-limited by livenessChannelInterval.
+	sc.livenessMutex.Lock()
+	defer sc.livenessMutex.Unlock()
 	if sc.liveness != nil {
 		if sc.alive != alive {
 			logger.Info(ctx, "update-liveness-channel-because-change")
@@ -485,6 +489,8 @@
 // Once unhealthy, we never go back
 func (sc *SaramaClient) setUnhealthy(ctx context.Context) {
 	sc.healthy = false
+	sc.healthinessMutex.Lock()
+	defer sc.healthinessMutex.Unlock()
 	if sc.healthiness != nil {
 		logger.Infow(ctx, "set-client-unhealthy", log.Fields{"healthy": sc.healthy})
 		sc.healthiness <- sc.healthy
@@ -594,6 +600,8 @@
 func (sc *SaramaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
 	logger.Infow(ctx, "kafka-enable-liveness-channel", log.Fields{"enable": enable})
 	if enable {
+		sc.livenessMutex.Lock()
+		defer sc.livenessMutex.Unlock()
 		if sc.liveness == nil {
 			logger.Info(ctx, "kafka-create-liveness-channel")
 			// At least 1, so we can immediately post to it without blocking
@@ -618,6 +626,8 @@
 func (sc *SaramaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
 	logger.Infow(ctx, "kafka-enable-healthiness-channel", log.Fields{"enable": enable})
 	if enable {
+		sc.healthinessMutex.Lock()
+		defer sc.healthinessMutex.Unlock()
 		if sc.healthiness == nil {
 			logger.Info(ctx, "kafka-create-healthiness-channel")
 			// At least 1, so we can immediately post to it without blocking