[VOL-1359] This commit consists of the creation of the simulated
OLT and ONU adapters (in Go language).  This update also provides
the set of files to build and run these containers.

Change-Id: Id7b0c77fdf60cb02c39908d4374d3e93fab5de67
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index e330b85..2df19e5 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -69,6 +69,8 @@
 	doneCh                        chan int
 	topicToConsumerChannelMap     map[string]*consumerChannels
 	lockTopicToConsumerChannelMap sync.RWMutex
+	topicLockMap                  map[string]*sync.RWMutex
+	lockOfTopicLockMap            sync.RWMutex
 }
 
 type SaramaClientOption func(*SaramaClient)
@@ -187,7 +189,8 @@
 	}
 
 	client.lockTopicToConsumerChannelMap = sync.RWMutex{}
-
+	client.topicLockMap = make(map[string]*sync.RWMutex)
+	client.lockOfTopicLockMap = sync.RWMutex{}
 	return client
 }
 
@@ -261,6 +264,9 @@
 
 //CreateTopic creates a topic on the Kafka Broker.
 func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
+	sc.lockTopic(topic)
+	defer sc.unLockTopic(topic)
+
 	// Set the topic details
 	topicDetail := &sarama.TopicDetail{}
 	topicDetail.NumPartitions = int32(numPartition)
@@ -286,6 +292,9 @@
 
 //DeleteTopic removes a topic from the kafka Broker
 func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
+	sc.lockTopic(topic)
+	defer sc.unLockTopic(topic)
+
 	// Remove the topic from the broker
 	if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
 		if err == sarama.ErrUnknownTopicOrPartition {
@@ -308,6 +317,9 @@
 // Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
 // messages from that topic
 func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ic.InterContainerMessage, error) {
+	sc.lockTopic(topic)
+	defer sc.unLockTopic(topic)
+
 	log.Debugw("subscribe", log.Fields{"topic": topic.Name})
 
 	// If a consumers already exist for that topic then resuse it
@@ -352,6 +364,9 @@
 
 //UnSubscribe unsubscribe a consumer from a given topic
 func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
+	sc.lockTopic(topic)
+	defer sc.unLockTopic(topic)
+
 	log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
 	err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
 	return err
@@ -417,6 +432,26 @@
 	return nil
 }
 
+func (sc *SaramaClient) lockTopic(topic *Topic) {
+	sc.lockOfTopicLockMap.Lock()
+	if _, exist := sc.topicLockMap[topic.Name]; exist {
+		sc.lockOfTopicLockMap.Unlock()
+		sc.topicLockMap[topic.Name].Lock()
+	} else {
+		sc.topicLockMap[topic.Name] = &sync.RWMutex{}
+		sc.lockOfTopicLockMap.Unlock()
+		sc.topicLockMap[topic.Name].Lock()
+	}
+}
+
+func (sc *SaramaClient) unLockTopic(topic *Topic) {
+	sc.lockOfTopicLockMap.Lock()
+	defer sc.lockOfTopicLockMap.Unlock()
+	if _, exist := sc.topicLockMap[topic.Name]; exist {
+		sc.topicLockMap[topic.Name].Unlock()
+	}
+}
+
 func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
 	sc.lockTopicToConsumerChannelMap.Lock()
 	defer sc.lockTopicToConsumerChannelMap.Unlock()