[VOL-1359] This commit consists of the creation of the simulated
OLT and ONU adapters (in Go language). This update also provides
the set of files to build and run these containers.
Change-Id: Id7b0c77fdf60cb02c39908d4374d3e93fab5de67
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index e330b85..2df19e5 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -69,6 +69,8 @@
doneCh chan int
topicToConsumerChannelMap map[string]*consumerChannels
lockTopicToConsumerChannelMap sync.RWMutex
+ topicLockMap map[string]*sync.RWMutex
+ lockOfTopicLockMap sync.RWMutex
}
type SaramaClientOption func(*SaramaClient)
@@ -187,7 +189,8 @@
}
client.lockTopicToConsumerChannelMap = sync.RWMutex{}
-
+ client.topicLockMap = make(map[string]*sync.RWMutex)
+ client.lockOfTopicLockMap = sync.RWMutex{}
return client
}
@@ -261,6 +264,9 @@
//CreateTopic creates a topic on the Kafka Broker.
func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
+ sc.lockTopic(topic)
+ defer sc.unLockTopic(topic)
+
// Set the topic details
topicDetail := &sarama.TopicDetail{}
topicDetail.NumPartitions = int32(numPartition)
@@ -286,6 +292,9 @@
//DeleteTopic removes a topic from the kafka Broker
func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
+ sc.lockTopic(topic)
+ defer sc.unLockTopic(topic)
+
// Remove the topic from the broker
if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
if err == sarama.ErrUnknownTopicOrPartition {
@@ -308,6 +317,9 @@
// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
// messages from that topic
func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ic.InterContainerMessage, error) {
+ sc.lockTopic(topic)
+ defer sc.unLockTopic(topic)
+
log.Debugw("subscribe", log.Fields{"topic": topic.Name})
// If a consumers already exist for that topic then resuse it
@@ -352,6 +364,9 @@
//UnSubscribe unsubscribe a consumer from a given topic
func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
+ sc.lockTopic(topic)
+ defer sc.unLockTopic(topic)
+
log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
return err
@@ -417,6 +432,26 @@
return nil
}
+func (sc *SaramaClient) lockTopic(topic *Topic) {
+ sc.lockOfTopicLockMap.Lock()
+ if _, exist := sc.topicLockMap[topic.Name]; exist {
+ sc.lockOfTopicLockMap.Unlock()
+ sc.topicLockMap[topic.Name].Lock()
+ } else {
+ sc.topicLockMap[topic.Name] = &sync.RWMutex{}
+ sc.lockOfTopicLockMap.Unlock()
+ sc.topicLockMap[topic.Name].Lock()
+ }
+}
+
+func (sc *SaramaClient) unLockTopic(topic *Topic) {
+ sc.lockOfTopicLockMap.Lock()
+ defer sc.lockOfTopicLockMap.Unlock()
+ if _, exist := sc.topicLockMap[topic.Name]; exist {
+ sc.topicLockMap[topic.Name].Unlock()
+ }
+}
+
func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
sc.lockTopicToConsumerChannelMap.Lock()
defer sc.lockTopicToConsumerChannelMap.Unlock()